HADOOP-14445. Delegation tokens are not shared between KMS instances. Contributed by Xiao Chen and Rushabh S Shah.
This commit is contained in:
parent
e81397545a
commit
583fa6ed48
@ -36,8 +36,9 @@
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.util.HttpExceptionUtils;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
@ -82,6 +83,8 @@
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT;
|
||||
import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
|
||||
import static org.apache.hadoop.util.KMSUtil.checkNotNull;
|
||||
import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
|
||||
@ -96,16 +99,13 @@
|
||||
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
|
||||
|
||||
private static final Logger LOG =
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSClientProvider.class);
|
||||
|
||||
private static final String INVALID_SIGNATURE = "Invalid signature";
|
||||
|
||||
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
|
||||
|
||||
public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR;
|
||||
public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND;
|
||||
|
||||
public static final String SCHEME_NAME = "kms";
|
||||
|
||||
private static final String UTF8 = "UTF-8";
|
||||
@ -133,12 +133,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
private static final ObjectWriter WRITER =
|
||||
new ObjectMapper().writerWithDefaultPrettyPrinter();
|
||||
|
||||
/* dtService defines the token service value for the kms token.
|
||||
* The value can be legacy format which is ip:port format or it can be uri.
|
||||
* If it's uri format, then the value is read from
|
||||
* CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key
|
||||
* provider creation time, and set to token's Service field.
|
||||
* When a token is renewed / canceled, its Service field will be used to
|
||||
* instantiate a KeyProvider, eliminating the need to read configs
|
||||
* at that time.
|
||||
*/
|
||||
private final Text dtService;
|
||||
|
||||
// Allow fallback to default kms server port 9600 for certain tests that do
|
||||
// not specify the port explicitly in the kms provider url.
|
||||
@VisibleForTesting
|
||||
public static volatile boolean fallbackDefaultPortForTesting = false;
|
||||
private final boolean copyLegacyToken;
|
||||
|
||||
private class EncryptedQueueRefiller implements
|
||||
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
|
||||
@ -162,66 +167,6 @@ public void fillQueueForKey(String keyName,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The KMS implementation of {@link TokenRenewer}.
|
||||
*/
|
||||
public static class KMSTokenRenewer extends TokenRenewer {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSTokenRenewer.class);
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Renewing delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
|
||||
"null" : keyProvider.getClass());
|
||||
return 0;
|
||||
}
|
||||
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).renewDelegationToken(token);
|
||||
} finally {
|
||||
if (keyProvider != null) {
|
||||
keyProvider.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Canceling delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
|
||||
"null" : keyProvider.getClass());
|
||||
return;
|
||||
}
|
||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).cancelDelegationToken(token);
|
||||
} finally {
|
||||
if (keyProvider != null) {
|
||||
keyProvider.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
|
||||
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
|
||||
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
|
||||
@ -281,13 +226,13 @@ public KeyProvider createProvider(URI providerUri, Configuration conf)
|
||||
}
|
||||
hostsPart = t[0];
|
||||
}
|
||||
return createProvider(conf, origUrl, port, hostsPart);
|
||||
return createProvider(conf, origUrl, port, hostsPart, providerUri);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private KeyProvider createProvider(Configuration conf,
|
||||
URL origUrl, int port, String hostsPart) throws IOException {
|
||||
private KeyProvider createProvider(Configuration conf, URL origUrl,
|
||||
int port, String hostsPart, URI providerUri) throws IOException {
|
||||
String[] hosts = hostsPart.split(";");
|
||||
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
@ -295,7 +240,7 @@ private KeyProvider createProvider(Configuration conf,
|
||||
providers[i] =
|
||||
new KMSClientProvider(
|
||||
new URI("kms", origUrl.getProtocol(), hosts[i], port,
|
||||
origUrl.getPath(), null, null), conf);
|
||||
origUrl.getPath(), null, null), conf, providerUri);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException("Could not instantiate KMSProvider.", e);
|
||||
}
|
||||
@ -353,17 +298,10 @@ public HttpURLConnection configure(HttpURLConnection conn)
|
||||
}
|
||||
}
|
||||
|
||||
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws
|
||||
IOException {
|
||||
super(conf);
|
||||
kmsUrl = createServiceURL(extractKMSPath(uri));
|
||||
int kmsPort = kmsUrl.getPort();
|
||||
if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
|
||||
kmsPort = 9600;
|
||||
}
|
||||
|
||||
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
|
||||
dtService = SecurityUtil.buildTokenService(addr);
|
||||
|
||||
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
|
||||
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
try {
|
||||
@ -376,6 +314,9 @@ public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
|
||||
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
|
||||
authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
|
||||
copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY,
|
||||
KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT);
|
||||
|
||||
configurator = new TimeoutConnConfigurator(timeout, sslFactory);
|
||||
encKeyVersionQueue =
|
||||
new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
|
||||
@ -400,6 +341,7 @@ public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
|
||||
new EncryptedQueueRefiller());
|
||||
authToken = new DelegationTokenAuthenticatedURL.Token();
|
||||
dtService = new Text(providerUri.toString());
|
||||
LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
|
||||
" created.", kmsUrl, dtService);
|
||||
}
|
||||
@ -473,7 +415,7 @@ private HttpURLConnection createConnection(final URL url, String method)
|
||||
@Override
|
||||
public HttpURLConnection run() throws Exception {
|
||||
DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createKMSAuthenticatedURL();
|
||||
return authUrl.openConnection(url, authToken, doAsUser);
|
||||
}
|
||||
});
|
||||
@ -924,7 +866,7 @@ public long renewDelegationToken(final Token<?> dToken) throws IOException {
|
||||
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
|
||||
token, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createKMSAuthenticatedURL();
|
||||
return getActualUgi().doAs(
|
||||
new PrivilegedExceptionAction<Long>() {
|
||||
@Override
|
||||
@ -956,7 +898,7 @@ public Void run() throws Exception {
|
||||
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
|
||||
dToken, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createKMSAuthenticatedURL();
|
||||
authUrl.cancelDelegationToken(url, token, doAsUser);
|
||||
return null;
|
||||
}
|
||||
@ -1008,6 +950,17 @@ private DelegationTokenAuthenticatedURL.Token generateDelegationToken(
|
||||
return token;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() {
|
||||
return new DelegationTokenAuthenticatedURL(configurator) {
|
||||
@Override
|
||||
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
getDelegationToken(URL url, Credentials creds) {
|
||||
return selectKMSDelegationToken(creds);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
@ -1016,7 +969,7 @@ public Token<?>[] addDelegationTokens(final String renewer,
|
||||
if (token == null) {
|
||||
final URL url = createURL(null, null, null, null);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
createKMSAuthenticatedURL();
|
||||
try {
|
||||
final String doAsUser = getDoAsUser();
|
||||
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||
@ -1030,9 +983,16 @@ public Token<?> run() throws Exception {
|
||||
}
|
||||
});
|
||||
if (token != null) {
|
||||
LOG.debug("New token received: ({})", token);
|
||||
if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) {
|
||||
// do not set service for legacy kind, for compatibility.
|
||||
token.setService(dtService);
|
||||
}
|
||||
LOG.info("New token created: ({})", token);
|
||||
credentials.addToken(token.getService(), token);
|
||||
tokens = new Token<?>[] { token };
|
||||
Token<?> legacyToken = createAndAddLegacyToken(credentials, token);
|
||||
tokens = legacyToken == null ?
|
||||
new Token<?>[] {token} :
|
||||
new Token<?>[] {token, legacyToken};
|
||||
} else {
|
||||
throw new IOException("Got NULL as delegation token");
|
||||
}
|
||||
@ -1049,13 +1009,75 @@ public Token<?> run() throws Exception {
|
||||
return tokens;
|
||||
}
|
||||
|
||||
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
|
||||
// Add existing credentials from the UGI, since provider is cached.
|
||||
Credentials creds = ugi.getCredentials();
|
||||
/**
|
||||
* If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY}
|
||||
* is true when creating the provider, then copy the passed-in token of
|
||||
* {@link KMSDelegationToken#TOKEN_KIND} and create a new token of
|
||||
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials.
|
||||
*
|
||||
* @return The legacy token, or null if one should not be created.
|
||||
*/
|
||||
private Token<?> createAndAddLegacyToken(Credentials credentials,
|
||||
Token<?> token) {
|
||||
if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND
|
||||
.equals(token.getKind())) {
|
||||
LOG.debug("Not creating legacy token because copyLegacyToken={}, "
|
||||
+ "token={}", copyLegacyToken, token);
|
||||
return null;
|
||||
}
|
||||
// copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same
|
||||
// underlying token for backwards-compatibility. Old clients/renewers
|
||||
// does not parse the new token and can only work with kms-dt.
|
||||
final Token<?> legacyToken = token.copyToken();
|
||||
legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND);
|
||||
final InetSocketAddress addr =
|
||||
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
|
||||
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
|
||||
legacyToken.setService(fallBackServiceText);
|
||||
LOG.info("Copied token to legacy kind: {}", legacyToken);
|
||||
credentials.addToken(legacyToken.getService(), legacyToken);
|
||||
return legacyToken;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Text getDelegationTokenService() {
|
||||
return dtService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a list of tokens, return the token that should be used for KMS
|
||||
* authentication.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
Token selectKMSDelegationToken(Credentials creds) {
|
||||
// always look for TOKEN_KIND first
|
||||
final TokenSelector<AbstractDelegationTokenIdentifier> tokenSelector =
|
||||
new AbstractDelegationTokenSelector<AbstractDelegationTokenIdentifier>(
|
||||
KMSDelegationToken.TOKEN_KIND) {
|
||||
};
|
||||
Token token = tokenSelector.selectToken(dtService, creds.getAllTokens());
|
||||
LOG.debug("Searching service {} found token {}", dtService, token);
|
||||
if (token != null) {
|
||||
return token;
|
||||
}
|
||||
|
||||
// fall back to look for token by service, regardless of kind.
|
||||
// this is old behavior, keeping for compatibility reasons (for example,
|
||||
// even if KMS server is new, if the job is submitted with an old kms
|
||||
// client, job runners on new version should be able to find the token).
|
||||
final InetSocketAddress addr =
|
||||
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
|
||||
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
|
||||
token = creds.getToken(fallBackServiceText);
|
||||
LOG.debug("Selected delegation token {} using service:{}", token,
|
||||
fallBackServiceText);
|
||||
return token;
|
||||
}
|
||||
|
||||
private boolean containsKmsDt(UserGroupInformation ugi) {
|
||||
final Credentials creds = ugi.getCredentials();
|
||||
if (!creds.getAllTokens().isEmpty()) {
|
||||
LOG.debug("Searching for token that matches service: {}", dtService);
|
||||
org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
dToken = creds.getToken(dtService);
|
||||
final Token dToken = selectKMSDelegationToken(creds);
|
||||
if (dToken != null) {
|
||||
return true;
|
||||
}
|
||||
|
@ -27,7 +27,10 @@
|
||||
@InterfaceAudience.Private
|
||||
public final class KMSDelegationToken {
|
||||
|
||||
public static final String TOKEN_KIND_STR = "kms-dt";
|
||||
public static final String TOKEN_LEGACY_KIND_STR = "kms-dt";
|
||||
public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR);
|
||||
|
||||
public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN";
|
||||
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
|
||||
|
||||
// Utility class is not supposed to be instantiated.
|
||||
@ -49,4 +52,21 @@ public Text getKind() {
|
||||
return TOKEN_KIND;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* DelegationTokenIdentifier used for the KMS for legacy tokens.
|
||||
*/
|
||||
@Deprecated
|
||||
public static class KMSLegacyDelegationTokenIdentifier
|
||||
extends DelegationTokenIdentifier {
|
||||
|
||||
public KMSLegacyDelegationTokenIdentifier() {
|
||||
super(TOKEN_LEGACY_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return TOKEN_LEGACY_KIND;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,56 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||
|
||||
/**
|
||||
* The {@link KMSTokenRenewer} that supports legacy tokens.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@Deprecated
|
||||
public class KMSLegacyTokenRenewer extends KMSTokenRenewer {
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_LEGACY_KIND);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a key provider for token renewal / cancellation.
|
||||
* Caller is responsible for closing the key provider.
|
||||
*/
|
||||
@Override
|
||||
protected KeyProvider createKeyProvider(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
assert token.getKind().equals(TOKEN_LEGACY_KIND);
|
||||
// Legacy tokens get service from configuration.
|
||||
return KMSUtil.createKeyProvider(conf,
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
}
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||
|
||||
/**
|
||||
* The KMS implementation of {@link TokenRenewer}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class KMSTokenRenewer extends TokenRenewer {
|
||||
|
||||
public static final Logger LOG = LoggerFactory
|
||||
.getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class);
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Renewing delegation token {}", token);
|
||||
final KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot renew token {}.",
|
||||
keyProvider == null ? "null" : keyProvider.getClass(), token);
|
||||
return 0;
|
||||
}
|
||||
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).renewDelegationToken(token);
|
||||
} finally {
|
||||
if (keyProvider != null) {
|
||||
keyProvider.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Canceling delegation token {}", token);
|
||||
final KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||
try {
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot cancel token {}.",
|
||||
keyProvider == null ? "null" : keyProvider.getClass(), token);
|
||||
return;
|
||||
}
|
||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).cancelDelegationToken(token);
|
||||
} finally {
|
||||
if (keyProvider != null) {
|
||||
keyProvider.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a key provider for token renewal / cancellation.
|
||||
* Caller is responsible for closing the key provider.
|
||||
*/
|
||||
protected KeyProvider createKeyProvider(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
return KMSUtil
|
||||
.createKeyProviderFromTokenService(conf, token.getService().toString());
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms;
|
@ -770,6 +770,16 @@ public class CommonConfigurationKeysPublic {
|
||||
/** Default value is 100 ms. */
|
||||
public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100;
|
||||
|
||||
/**
|
||||
* @see
|
||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||
* core-default.xml</a>
|
||||
*/
|
||||
public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY =
|
||||
"hadoop.security.kms.client.copy.legacy.token";
|
||||
/** Default value is true. */
|
||||
public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true;
|
||||
|
||||
/**
|
||||
* @see
|
||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||
|
@ -300,11 +300,7 @@ public HttpURLConnection openConnection(URL url, Token token, String doAs)
|
||||
creds.getAllTokens());
|
||||
}
|
||||
if (!creds.getAllTokens().isEmpty()) {
|
||||
InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
|
||||
url.getPort());
|
||||
Text service = SecurityUtil.buildTokenService(serviceAddr);
|
||||
dToken = creds.getToken(service);
|
||||
LOG.debug("Using delegation token {} from service:{}", dToken, service);
|
||||
dToken = getDelegationToken(url, creds);
|
||||
if (dToken != null) {
|
||||
if (useQueryStringForDelegationToken()) {
|
||||
// delegation token will go in the query string, injecting it
|
||||
@ -340,6 +336,21 @@ public HttpURLConnection openConnection(URL url, Token token, String doAs)
|
||||
return conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Select a delegation token from all tokens in credentials, based on url.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||
getDelegationToken(URL url, Credentials creds) {
|
||||
final InetSocketAddress serviceAddr =
|
||||
new InetSocketAddress(url.getHost(), url.getPort());
|
||||
final Text service = SecurityUtil.buildTokenService(serviceAddr);
|
||||
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
|
||||
creds.getToken(service);
|
||||
LOG.debug("Selected delegation token {} using service:{}", dToken, service);
|
||||
return dToken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests a delegation token using the configured <code>Authenticator</code>
|
||||
* for authentication.
|
||||
|
@ -81,7 +81,7 @@
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class DelegationTokenAuthenticationHandler
|
||||
implements AuthenticationHandler {
|
||||
private static final Logger LOG =
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
|
||||
|
||||
protected static final String TYPE_POSTFIX = "-dt";
|
||||
@ -224,7 +224,8 @@ public boolean managementOperation(AuthenticationToken token,
|
||||
HttpServletRequest request, HttpServletResponse response)
|
||||
throws IOException, AuthenticationException {
|
||||
boolean requestContinues = true;
|
||||
LOG.trace("Processing operation for req=({}), token: {}", request, token);
|
||||
LOG.trace("Processing operation for req=({}), token: {}",
|
||||
request.getRequestURL(), token);
|
||||
String op = ServletUtils.getParameter(request,
|
||||
KerberosDelegationTokenAuthenticator.OP_PARAM);
|
||||
op = (op != null) ? StringUtils.toUpperCase(op) : null;
|
||||
@ -407,7 +408,8 @@ public AuthenticationToken authenticate(HttpServletRequest request,
|
||||
HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Falling back to {} (req={})", authHandler.getClass(), request);
|
||||
LOG.debug("Falling back to {} (req={})", authHandler.getClass(),
|
||||
request.getRequestURL());
|
||||
token = authHandler.authenticate(request, response);
|
||||
}
|
||||
return token;
|
||||
|
@ -50,7 +50,7 @@
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class DelegationTokenAuthenticator implements Authenticator {
|
||||
private static Logger LOG =
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
|
||||
|
||||
private static final String CONTENT_TYPE = "Content-Type";
|
||||
|
@ -30,6 +30,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
@ -41,8 +42,7 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class KMSUtil {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSUtil.class);
|
||||
public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class);
|
||||
|
||||
private KMSUtil() { /* Hidden constructor */ }
|
||||
|
||||
@ -64,6 +64,13 @@ public static KeyProvider createKeyProvider(final Configuration conf,
|
||||
if (providerUriStr == null || providerUriStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests(
|
||||
providerUriStr, conf);
|
||||
if (kp != null) {
|
||||
LOG.info("KeyProvider is created with uri: {}. This should happen only " +
|
||||
"in tests.", providerUriStr);
|
||||
return kp;
|
||||
}
|
||||
return createKeyProviderFromUri(conf, URI.create(providerUriStr));
|
||||
}
|
||||
|
||||
@ -205,4 +212,38 @@ public static KeyProvider.Metadata parseJSONMetadata(Map valueMap) {
|
||||
}
|
||||
return metadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a key provider from token service field, which must be URI format.
|
||||
*
|
||||
* @param conf
|
||||
* @param tokenServiceValue
|
||||
* @return new KeyProvider or null
|
||||
* @throws IOException
|
||||
*/
|
||||
public static KeyProvider createKeyProviderFromTokenService(
|
||||
final Configuration conf, final String tokenServiceValue)
|
||||
throws IOException {
|
||||
LOG.debug("Creating key provider from token service value {}. ",
|
||||
tokenServiceValue);
|
||||
final KeyProvider kp = KMSUtilFaultInjector.get()
|
||||
.createKeyProviderForTests(tokenServiceValue, conf);
|
||||
if (kp != null) {
|
||||
LOG.info("KeyProvider is created with uri: {}. This should happen only "
|
||||
+ "in tests.", tokenServiceValue);
|
||||
return kp;
|
||||
}
|
||||
if (!tokenServiceValue.contains("://")) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid token service " + tokenServiceValue);
|
||||
}
|
||||
final URI tokenServiceUri;
|
||||
try {
|
||||
tokenServiceUri = new URI(tokenServiceValue);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid token service " + tokenServiceValue, e);
|
||||
}
|
||||
return createKeyProviderFromUri(conf, tokenServiceUri);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Used for returning custom KeyProvider from test methods.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class KMSUtilFaultInjector {
|
||||
private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector();
|
||||
|
||||
public static KMSUtilFaultInjector get() {
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static void set(KMSUtilFaultInjector injector) {
|
||||
instance = injector;
|
||||
}
|
||||
|
||||
public KeyProvider createKeyProviderForTests(String value, Configuration conf)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -12,3 +12,4 @@
|
||||
# limitations under the License.
|
||||
#
|
||||
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
|
||||
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier
|
||||
|
@ -11,4 +11,5 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
|
||||
org.apache.hadoop.crypto.key.kms.KMSTokenRenewer
|
||||
org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer
|
@ -2602,6 +2602,26 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.security.kms.client.copy.legacy.token</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Expert only. Whether the KMS client provider should copy a token to legacy
|
||||
kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the
|
||||
default value set to true, the client will locally duplicate the
|
||||
KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service field
|
||||
conforming to kms-dt. All other parts of the token remain the same.
|
||||
Then the new clients will use KMS_DELEGATION_TOKEN and old clients will
|
||||
use kms-dt to authenticate. Default value is true.
|
||||
You should only change this to false if you know all the KMS servers
|
||||
, clients (including both job submitters and job runners) and the
|
||||
token renewers (usually Yarn RM) are on a version that supports
|
||||
KMS_DELEGATION_TOKEN.
|
||||
Turning this off prematurely may result in old clients failing to
|
||||
authenticate with new servers.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.security.kms.client.failover.sleep.max.millis</name>
|
||||
<value>2000</value>
|
||||
|
@ -0,0 +1,162 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key.kms;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.event.Level;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit test for {@link KMSClientProvider} class.
|
||||
*/
|
||||
public class TestKMSClientProvider {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestKMSClientProvider.class);
|
||||
|
||||
private final Token token = new Token();
|
||||
private final Token legacyToken = new Token();
|
||||
private final String uriString = "kms://https@host:16000/kms";
|
||||
private final String legacyTokenService = "host:16000";
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(30000);
|
||||
|
||||
{
|
||||
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
SecurityUtil.setTokenServiceUseIp(false);
|
||||
token.setKind(TOKEN_KIND);
|
||||
token.setService(new Text(uriString));
|
||||
legacyToken.setKind(TOKEN_LEGACY_KIND);
|
||||
legacyToken.setService(new Text(legacyTokenService));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNotCopyFromLegacyToken() throws Exception {
|
||||
final DelegationTokenAuthenticatedURL url =
|
||||
mock(DelegationTokenAuthenticatedURL.class);
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(uriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||
try {
|
||||
final KMSClientProvider spyKp = spy(kp);
|
||||
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
|
||||
when(url.getDelegationToken(any(URL.class),
|
||||
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
|
||||
any(String.class))).thenReturn(legacyToken);
|
||||
|
||||
final Credentials creds = new Credentials();
|
||||
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
|
||||
LOG.info("Got tokens: {}", tokens);
|
||||
assertEquals(1, tokens.length);
|
||||
LOG.info("uri:" + uriString);
|
||||
// if KMS server returned a legacy token, new client should leave the
|
||||
// service being legacy and not set uri string
|
||||
assertEquals(legacyTokenService, tokens[0].getService().toString());
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyFromToken() throws Exception {
|
||||
final DelegationTokenAuthenticatedURL url =
|
||||
mock(DelegationTokenAuthenticatedURL.class);
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(uriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||
try {
|
||||
final KMSClientProvider spyKp = spy(kp);
|
||||
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
|
||||
when(url.getDelegationToken(any(URL.class),
|
||||
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
|
||||
any(String.class))).thenReturn(token);
|
||||
|
||||
final Credentials creds = new Credentials();
|
||||
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
|
||||
LOG.info("Got tokens: {}", tokens);
|
||||
assertEquals(2, tokens.length);
|
||||
assertTrue(creds.getAllTokens().contains(token));
|
||||
assertNotNull(creds.getToken(legacyToken.getService()));
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectTokenWhenBothExist() throws Exception {
|
||||
final Credentials creds = new Credentials();
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(uriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||
try {
|
||||
creds.addToken(token.getService(), token);
|
||||
creds.addToken(legacyToken.getService(), legacyToken);
|
||||
Token t = kp.selectKMSDelegationToken(creds);
|
||||
assertEquals(token, t);
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectTokenLegacyService() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final URI uri = new URI(uriString);
|
||||
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||
try {
|
||||
Text legacyService = new Text(legacyTokenService);
|
||||
token.setService(legacyService);
|
||||
final Credentials creds = new Credentials();
|
||||
creds.addToken(legacyService, token);
|
||||
Token t = kp.selectKMSDelegationToken(creds);
|
||||
assertEquals(token, t);
|
||||
} finally {
|
||||
kp.close();
|
||||
}
|
||||
}
|
||||
}
|
@ -42,7 +42,8 @@
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.junit.After;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.KMSUtilFaultInjector;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
@ -56,33 +57,68 @@ public static void setup() throws IOException {
|
||||
SecurityUtil.setTokenServiceUseIp(false);
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws IOException {
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = false;
|
||||
private void setKMSUtilFaultInjector() {
|
||||
KMSUtilFaultInjector injector = new KMSUtilFaultInjector() {
|
||||
@Override
|
||||
public KeyProvider createKeyProviderForTests(
|
||||
String value, Configuration conf) throws IOException {
|
||||
return TestLoadBalancingKMSClientProvider
|
||||
.createKeyProviderForTests(value, conf);
|
||||
}
|
||||
};
|
||||
KMSUtilFaultInjector.set(injector);
|
||||
}
|
||||
|
||||
public static KeyProvider createKeyProviderForTests(
|
||||
String value, Configuration conf) throws IOException {
|
||||
// The syntax for kms servers will be
|
||||
// kms://http@localhost:port1/kms,kms://http@localhost:port2/kms
|
||||
if (!value.contains(",")) {
|
||||
return null;
|
||||
}
|
||||
String[] keyProviderUrisStr = value.split(",");
|
||||
KMSClientProvider[] keyProviderArr =
|
||||
new KMSClientProvider[keyProviderUrisStr.length];
|
||||
|
||||
int i = 0;
|
||||
for (String keyProviderUri: keyProviderUrisStr) {
|
||||
KMSClientProvider kmcp =
|
||||
new KMSClientProvider(URI.create(keyProviderUri), conf, URI
|
||||
.create(value));
|
||||
keyProviderArr[i] = kmcp;
|
||||
i++;
|
||||
}
|
||||
LoadBalancingKMSClientProvider lbkcp =
|
||||
new LoadBalancingKMSClientProvider(keyProviderArr, conf);
|
||||
return lbkcp;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreation() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
||||
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
|
||||
"kms://http@host1/kms/foo"), conf);
|
||||
"kms://http@host1:9600/kms/foo"), conf);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
KMSClientProvider[] providers =
|
||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||
assertEquals(1, providers.length);
|
||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
|
||||
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
|
||||
Sets.newHashSet(providers[0].getKMSUrl()));
|
||||
|
||||
kp = new KMSClientProvider.Factory().createProvider(new URI(
|
||||
"kms://http@host1;host2;host3/kms/foo"), conf);
|
||||
setKMSUtilFaultInjector();
|
||||
String uriStr = "kms://http@host1:9600/kms/foo," +
|
||||
"kms://http@host2:9600/kms/foo," +
|
||||
"kms://http@host3:9600/kms/foo";
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||
uriStr);
|
||||
kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic
|
||||
.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||
providers =
|
||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||
assertEquals(3, providers.length);
|
||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
|
||||
"http://host2/kms/foo/v1/",
|
||||
"http://host3/kms/foo/v1/"),
|
||||
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
|
||||
"http://host2:9600/kms/foo/v1/",
|
||||
"http://host3:9600/kms/foo/v1/"),
|
||||
Sets.newHashSet(providers[0].getKMSUrl(),
|
||||
providers[1].getKMSUrl(),
|
||||
providers[2].getKMSUrl()));
|
||||
@ -208,7 +244,7 @@ private static <E extends Throwable> void throwException(Throwable ex)
|
||||
|
||||
private class MyKMSClientProvider extends KMSClientProvider {
|
||||
public MyKMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
super(uri, conf);
|
||||
super(uri, conf, uri);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -245,9 +281,8 @@ public KeyVersion rollNewVersion(final String name)
|
||||
@Test
|
||||
public void testClassCastException() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
||||
KMSClientProvider p1 = new MyKMSClientProvider(
|
||||
new URI("kms://http@host1/kms/foo"), conf);
|
||||
new URI("kms://http@host1:9600/kms/foo"), conf);
|
||||
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||
new KMSClientProvider[] {p1}, 0, conf);
|
||||
try {
|
||||
|
@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test {@link KMSUtil}.
|
||||
*/
|
||||
public class TestKMSUtil {
|
||||
|
||||
public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class);
|
||||
|
||||
@Rule
|
||||
public Timeout globalTimeout = new Timeout(90000);
|
||||
|
||||
@Test
|
||||
public void testCreateKeyProviderFromTokenService() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf,
|
||||
"kms://https@localhost:9600/kms");
|
||||
assertNotNull(kp);
|
||||
kp.close();
|
||||
|
||||
kp = KMSUtil.createKeyProviderFromTokenService(conf,
|
||||
"kms://https@localhost:9600/kms,kms://localhost1:9600/kms");
|
||||
assertNotNull(kp);
|
||||
kp.close();
|
||||
|
||||
String invalidService = "whatever:9600";
|
||||
try {
|
||||
KMSUtil.createKeyProviderFromTokenService(conf, invalidService);
|
||||
} catch (Exception ex) {
|
||||
LOG.info("Expected exception:", ex);
|
||||
assertTrue(ex instanceof IllegalArgumentException);
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"Invalid token service " + invalidService, ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,3 +1,4 @@
|
||||
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
@ -31,26 +32,35 @@
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
|
||||
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.ValueQueue;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.KMSUtilFaultInjector;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
@ -71,7 +81,6 @@
|
||||
import java.io.InputStream;
|
||||
import java.io.Writer;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
@ -96,6 +105,10 @@
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -113,6 +126,20 @@ public class TestKMS {
|
||||
|
||||
private SSLFactory sslFactory;
|
||||
|
||||
private final KMSUtilFaultInjector oldInjector =
|
||||
KMSUtilFaultInjector.get();
|
||||
|
||||
// Injector to create providers with different ports. Can only happen in tests
|
||||
private final KMSUtilFaultInjector testInjector =
|
||||
new KMSUtilFaultInjector() {
|
||||
@Override
|
||||
public KeyProvider createKeyProviderForTests(String value,
|
||||
Configuration conf) throws IOException {
|
||||
return TestLoadBalancingKMSClientProvider
|
||||
.createKeyProviderForTests(value, conf);
|
||||
}
|
||||
};
|
||||
|
||||
// Keep track of all key providers created during a test case, so they can be
|
||||
// closed at test tearDown.
|
||||
private List<KeyProvider> providersCreated = new LinkedList<>();
|
||||
@ -122,7 +149,12 @@ public class TestKMS {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
setUpMiniKdc();
|
||||
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
||||
GenericTestUtils
|
||||
.setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
|
||||
GenericTestUtils
|
||||
.setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
|
||||
GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
|
||||
// resetting kerberos security
|
||||
Configuration conf = new Configuration();
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
@ -141,24 +173,78 @@ public static File getTestDir() throws Exception {
|
||||
}
|
||||
|
||||
public static abstract class KMSCallable<T> implements Callable<T> {
|
||||
private URL kmsUrl;
|
||||
private List<URL> kmsUrl;
|
||||
|
||||
protected URL getKMSUrl() {
|
||||
return kmsUrl;
|
||||
return kmsUrl.get(0);
|
||||
}
|
||||
|
||||
protected URL[] getKMSHAUrl() {
|
||||
URL[] urls = new URL[kmsUrl.size()];
|
||||
return kmsUrl.toArray(urls);
|
||||
}
|
||||
|
||||
protected void addKMSUrl(URL url) {
|
||||
if (kmsUrl == null) {
|
||||
kmsUrl = new ArrayList<URL>();
|
||||
}
|
||||
kmsUrl.add(url);
|
||||
}
|
||||
|
||||
/*
|
||||
* The format of the returned value will be
|
||||
* kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
|
||||
*/
|
||||
protected String generateLoadBalancingKeyProviderUriString() {
|
||||
if (kmsUrl == null || kmsUrl.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
StringBuffer sb = new StringBuffer();
|
||||
|
||||
for (int i = 0; i < kmsUrl.size(); i++) {
|
||||
sb.append(KMSClientProvider.SCHEME_NAME + "://" +
|
||||
kmsUrl.get(0).getProtocol() + "@");
|
||||
URL url = kmsUrl.get(i);
|
||||
sb.append(url.getAuthority());
|
||||
if (url.getPath() != null) {
|
||||
sb.append(url.getPath());
|
||||
}
|
||||
if (i < kmsUrl.size() - 1) {
|
||||
sb.append(",");
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
protected KeyProvider createProvider(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
final KeyProvider ret = new LoadBalancingKMSClientProvider(
|
||||
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
|
||||
new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
|
||||
providersCreated.add(ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* create a LoadBalancingKMSClientProvider from an array of URIs.
|
||||
* @param uris an array of KMS URIs
|
||||
* @param conf configuration object
|
||||
* @return a LoadBalancingKMSClientProvider object
|
||||
* @throws IOException
|
||||
*/
|
||||
protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
|
||||
Configuration conf, String originalUri) throws IOException {
|
||||
KMSClientProvider[] providers = new KMSClientProvider[uris.length];
|
||||
for (int i = 0; i < providers.length; i++) {
|
||||
providers[i] =
|
||||
new KMSClientProvider(uris[i], conf, URI.create(originalUri));
|
||||
}
|
||||
return new LoadBalancingKMSClientProvider(providers, conf);
|
||||
}
|
||||
|
||||
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
final KMSClientProvider ret = new KMSClientProvider(uri, conf);
|
||||
final KMSClientProvider ret = new KMSClientProvider(uri, conf, uri);
|
||||
providersCreated.add(ret);
|
||||
return ret;
|
||||
}
|
||||
@ -170,22 +256,33 @@ protected <T> T runServer(String keystore, String password, File confDir,
|
||||
|
||||
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServer(new int[] {port}, keystore, password, confDir, callable);
|
||||
}
|
||||
|
||||
protected <T> T runServer(int[] ports, String keystore, String password,
|
||||
File confDir, KMSCallable<T> callable) throws Exception {
|
||||
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
||||
.setLog4jConfFile("log4j.properties");
|
||||
if (keystore != null) {
|
||||
miniKMSBuilder.setSslConf(new File(keystore), password);
|
||||
}
|
||||
if (port > 0) {
|
||||
miniKMSBuilder.setPort(port);
|
||||
final List<MiniKMS> kmsList = new ArrayList<>();
|
||||
for (int i=0; i< ports.length; i++) {
|
||||
if (ports[i] > 0) {
|
||||
miniKMSBuilder.setPort(ports[i]);
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
kmsList.add(miniKMS);
|
||||
miniKMS.start();
|
||||
LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.addKMSUrl(miniKMS.getKMSUrl());
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
miniKMS.start();
|
||||
try {
|
||||
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.kmsUrl = miniKMS.getKMSUrl();
|
||||
return callable.call();
|
||||
} finally {
|
||||
miniKMS.stop();
|
||||
for (MiniKMS miniKMS: kmsList) {
|
||||
miniKMS.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -240,6 +337,13 @@ public static URI createKMSUri(URL kmsUrl) throws Exception {
|
||||
return new URI("kms://" + str);
|
||||
}
|
||||
|
||||
public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
|
||||
URI[] uris = new URI[kmsUrls.length];
|
||||
for (int i = 0; i < kmsUrls.length; i++) {
|
||||
uris[i] = createKMSUri(kmsUrls[i]);
|
||||
}
|
||||
return uris;
|
||||
}
|
||||
|
||||
private static class KerberosConfiguration
|
||||
extends javax.security.auth.login.Configuration {
|
||||
@ -315,19 +419,17 @@ private static void setUpMiniKdc(Properties kdcConf) throws Exception {
|
||||
principals.toArray(new String[principals.size()]));
|
||||
}
|
||||
|
||||
private void setUpMiniKdc() throws Exception {
|
||||
@BeforeClass
|
||||
public static void setUpMiniKdc() throws Exception {
|
||||
Properties kdcConf = MiniKdc.createConf();
|
||||
setUpMiniKdc(kdcConf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (kdc != null) {
|
||||
kdc.stop();
|
||||
kdc = null;
|
||||
}
|
||||
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
||||
UserGroupInformation.reset();
|
||||
KMSUtilFaultInjector.set(oldInjector);
|
||||
if (!providersCreated.isEmpty()) {
|
||||
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||
for (KeyProvider kp : providersCreated) {
|
||||
@ -345,6 +447,14 @@ public void tearDown() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownMiniKdc() {
|
||||
if (kdc != null) {
|
||||
kdc.stop();
|
||||
kdc = null;
|
||||
}
|
||||
}
|
||||
|
||||
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
||||
throws Exception {
|
||||
UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath());
|
||||
@ -501,8 +611,10 @@ public Void run() throws Exception {
|
||||
Token<?>[] tokens =
|
||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
||||
.addDelegationTokens("myuser", new Credentials());
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
||||
assertEquals(2, tokens.length);
|
||||
assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
tokens[0].getKind());
|
||||
kp.close();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
@ -518,8 +630,9 @@ public Void run() throws Exception {
|
||||
Token<?>[] tokens =
|
||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
||||
.addDelegationTokens("myuser", new Credentials());
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
||||
assertEquals(2, tokens.length);
|
||||
assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
|
||||
kp.close();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -2011,7 +2124,6 @@ public Void run() throws Exception {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
nonKerberosUgi.addCredentials(credentials);
|
||||
|
||||
try {
|
||||
@ -2067,6 +2179,17 @@ public void testDelegationTokensOpsHttpsKerberized() throws Exception {
|
||||
testDelegationTokensOps(true, true);
|
||||
}
|
||||
|
||||
private Text getTokenService(KeyProvider provider) {
|
||||
assertTrue("KeyProvider should be an instance of KMSClientProvider",
|
||||
(provider instanceof LoadBalancingKMSClientProvider));
|
||||
assertEquals("Num client providers should be 1", 1,
|
||||
((LoadBalancingKMSClientProvider)provider).getProviders().length);
|
||||
Text tokenService =
|
||||
(((LoadBalancingKMSClientProvider)provider).getProviders()[0])
|
||||
.getDelegationTokenService();
|
||||
return tokenService;
|
||||
}
|
||||
|
||||
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
|
||||
throws Exception {
|
||||
final File confDir = getTestDir();
|
||||
@ -2098,11 +2221,16 @@ public Void call() throws Exception {
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
createKMSUri(getKMSUrl()).toString());
|
||||
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
||||
|
||||
doAs("client", new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, clientConf);
|
||||
// Unset the conf value for key provider path just to be sure that
|
||||
// the key provider created for renew and cancel token is from
|
||||
// token service field.
|
||||
clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
// test delegation token retrieval
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
@ -2110,13 +2238,10 @@ public Void run() throws Exception {
|
||||
final Credentials credentials = new Credentials();
|
||||
final Token<?>[] tokens =
|
||||
kpdte.addDelegationTokens("client1", credentials);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
InetSocketAddress kmsAddr =
|
||||
new InetSocketAddress(getKMSUrl().getHost(),
|
||||
getKMSUrl().getPort());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
getKind());
|
||||
Text tokenService = getTokenService(kp);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
assertEquals(TOKEN_KIND,
|
||||
credentials.getToken(tokenService).getKind());
|
||||
|
||||
// Test non-renewer user cannot renew.
|
||||
for (Token<?> token : tokens) {
|
||||
@ -2243,12 +2368,11 @@ public Void call() throws Exception {
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
createKMSUri(getKMSUrl()).toString());
|
||||
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
||||
final KeyProvider kp = createProvider(uri, clientConf);
|
||||
final KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(kp);
|
||||
final InetSocketAddress kmsAddr =
|
||||
new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
|
||||
|
||||
// Job 1 (e.g. YARN log aggregation job), with user DT.
|
||||
final Collection<Token<?>> job1Token = new HashSet<>();
|
||||
@ -2258,16 +2382,17 @@ public Void run() throws Exception {
|
||||
// Get a DT and use it.
|
||||
final Credentials credentials = new Credentials();
|
||||
kpdte.addDelegationTokens("client", credentials);
|
||||
Text tokenService = getTokenService(kp);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
|
||||
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
|
||||
|
||||
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
||||
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
|
||||
getCurrentUser().getCredentials().getAllTokens());
|
||||
Token<?> token =
|
||||
final Token<?> token =
|
||||
UserGroupInformation.getCurrentUser().getCredentials()
|
||||
.getToken(SecurityUtil.buildTokenService(kmsAddr));
|
||||
Assert.assertNotNull(token);
|
||||
.getToken(tokenService);
|
||||
assertNotNull(token);
|
||||
assertEquals(TOKEN_KIND, token.getKind());
|
||||
job1Token.add(token);
|
||||
|
||||
// Decode the token to get max time.
|
||||
@ -2302,17 +2427,16 @@ public Void run() throws Exception {
|
||||
// Get a new DT, but don't use it yet.
|
||||
final Credentials newCreds = new Credentials();
|
||||
kpdte.addDelegationTokens("client", newCreds);
|
||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
getKind());
|
||||
assertEquals(1, newCreds.getAllTokens().size());
|
||||
final Text tokenService = getTokenService(kp);
|
||||
assertEquals(TOKEN_KIND,
|
||||
newCreds.getToken(tokenService).getKind());
|
||||
|
||||
// Using job 1's DT should fail.
|
||||
final Credentials oldCreds = new Credentials();
|
||||
for (Token<?> token : job1Token) {
|
||||
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
|
||||
oldCreds
|
||||
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
|
||||
if (token.getKind().equals(TOKEN_KIND)) {
|
||||
oldCreds.addToken(tokenService, token);
|
||||
}
|
||||
}
|
||||
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
|
||||
@ -2326,12 +2450,11 @@ public Void run() throws Exception {
|
||||
}
|
||||
|
||||
// Using the new DT should succeed.
|
||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
getKind());
|
||||
assertEquals(1, newCreds.getAllTokens().size());
|
||||
assertEquals(TOKEN_KIND,
|
||||
newCreds.getToken(tokenService).getKind());
|
||||
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
|
||||
LOG.info("Credetials now are: {}", UserGroupInformation
|
||||
LOG.info("Credentials now are: {}", UserGroupInformation
|
||||
.getCurrentUser().getCredentials().getAllTokens());
|
||||
kp.getKeys();
|
||||
return null;
|
||||
@ -2357,7 +2480,13 @@ public void testKMSWithZKSignerAndDTSM() throws Exception {
|
||||
doKMSWithZK(true, true);
|
||||
}
|
||||
|
||||
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
|
||||
}
|
||||
|
||||
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||
KMSCallable<T> callable, int kmsSize) throws Exception {
|
||||
TestingServer zkServer = null;
|
||||
try {
|
||||
zkServer = new TestingServer();
|
||||
@ -2403,43 +2532,265 @@ public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
runServer(null, null, testDir, c);
|
||||
int[] ports = new int[kmsSize];
|
||||
for (int i = 0; i < ports.length; i++) {
|
||||
ports[i] = -1;
|
||||
}
|
||||
return runServer(ports, null, null, testDir, callable);
|
||||
} finally {
|
||||
if (zkServer != null) {
|
||||
zkServer.stop();
|
||||
zkServer.close();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KeyProvider kp = createProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
kp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
runServerWithZooKeeper(zkDTSM, zkSigner, c);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
|
||||
KMSCallable<Void> c = new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
||||
final Credentials credentials = new Credentials();
|
||||
final String lbUri = generateLoadBalancingKeyProviderUriString();
|
||||
final LoadBalancingKMSClientProvider lbkp =
|
||||
createHAProvider(uris, conf, lbUri);
|
||||
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
// Login as a Kerberos user principal using keytab.
|
||||
// Connect to KMS to create a delegation token and add it to credentials
|
||||
final String keyName = "k0";
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(lbkp);
|
||||
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
||||
kpdte.createKey(keyName, new KeyProvider.Options(conf));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
assertTokenIdentifierEquals(credentials);
|
||||
|
||||
final LoadBalancingKMSClientProvider lbkp1 =
|
||||
createHAProvider(uris, conf, lbUri);
|
||||
// verify both tokens can be used to authenticate
|
||||
for (Token t : credentials.getAllTokens()) {
|
||||
assertTokenAccess(lbkp1, keyName, t);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
runServerWithZooKeeper(true, true, c, 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the passed in credentials have 2 tokens, of kind
|
||||
* {@link KMSDelegationToken#TOKEN_KIND} and
|
||||
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have
|
||||
* the same identifier.
|
||||
*/
|
||||
private void assertTokenIdentifierEquals(Credentials credentials)
|
||||
throws IOException {
|
||||
// verify the 2 tokens have the same identifier
|
||||
assertEquals(2, credentials.getAllTokens().size());
|
||||
Token token = null;
|
||||
Token legacyToken = null;
|
||||
for (Token t : credentials.getAllTokens()) {
|
||||
if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
|
||||
token = t;
|
||||
} else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
|
||||
legacyToken = t;
|
||||
}
|
||||
}
|
||||
assertNotNull(token);
|
||||
assertNotNull(legacyToken);
|
||||
final DelegationTokenIdentifier tokenId =
|
||||
(DelegationTokenIdentifier) token.decodeIdentifier();
|
||||
final DelegationTokenIdentifier legacyTokenId =
|
||||
(DelegationTokenIdentifier) legacyToken.decodeIdentifier();
|
||||
assertEquals("KMS DT and legacy dt should have identical identifier",
|
||||
tokenId, legacyTokenId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests token access with each providers in the
|
||||
* {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
|
||||
* kinds are compatible and can both be used to authenticate.
|
||||
*/
|
||||
private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
|
||||
final String keyName, final Token token) throws Exception {
|
||||
UserGroupInformation tokenUgi =
|
||||
UserGroupInformation.createUserForTesting("test", new String[] {});
|
||||
// Verify the tokens can authenticate to any KMS
|
||||
tokenUgi.addToken(token);
|
||||
tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
// Create a kms client with one provider at a time. Must use one
|
||||
// provider so that if it fails to authenticate, it does not fall
|
||||
// back to the next KMS instance.
|
||||
// It should succeed because its delegation token can access any
|
||||
// KMS instances.
|
||||
for (KMSClientProvider provider : lbkp.getProviders()) {
|
||||
if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
|
||||
.equals(provider.getDelegationTokenService())) {
|
||||
// Historically known issue: Legacy token can only work with the
|
||||
// key provider specified in the token's Service
|
||||
continue;
|
||||
}
|
||||
LOG.info("Rolling key {} via provider {} with token {}.", keyName,
|
||||
provider, token);
|
||||
provider.rollNewVersion(keyName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
|
||||
testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
|
||||
testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
|
||||
}
|
||||
|
||||
private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
|
||||
throws Exception {
|
||||
GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
|
||||
assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
|
||||
KMSCallable<Void> c = new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
||||
final Credentials credentials = new Credentials();
|
||||
// Create a UGI without Kerberos auth. It will be authenticated with
|
||||
// delegation token.
|
||||
final UserGroupInformation nonKerberosUgi =
|
||||
UserGroupInformation.getCurrentUser();
|
||||
final String lbUri = generateLoadBalancingKeyProviderUriString();
|
||||
final LoadBalancingKMSClientProvider lbkp =
|
||||
createHAProvider(uris, conf, lbUri);
|
||||
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
// Login as a Kerberos user principal using keytab.
|
||||
// Connect to KMS to create a delegation token and add it to credentials
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(lbkp);
|
||||
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// Test token renewal and cancellation
|
||||
final Collection<Token<? extends TokenIdentifier>> tokens =
|
||||
credentials.getAllTokens();
|
||||
doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
Assert.assertEquals(2, tokens.size());
|
||||
boolean tokenFound = false;
|
||||
for (Token token : tokens) {
|
||||
if (!tokenKind.equals(token.getKind())) {
|
||||
continue;
|
||||
} else {
|
||||
tokenFound = true;
|
||||
}
|
||||
KMSUtilFaultInjector.set(testInjector);
|
||||
setupConfForToken(token.getKind(), conf, lbUri);
|
||||
|
||||
LOG.info("Testing token: {}", token);
|
||||
long tokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
|
||||
Thread.sleep(10);
|
||||
long newTokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token {}, new lifetime:{}", token,
|
||||
newTokenLife);
|
||||
assertTrue(newTokenLife > tokenLife);
|
||||
|
||||
boolean canceled = false;
|
||||
// test delegation token cancellation
|
||||
if (!canceled) {
|
||||
token.cancel(conf);
|
||||
LOG.info("Cancelled token {}", token);
|
||||
canceled = true;
|
||||
}
|
||||
assertTrue("token should have been canceled", canceled);
|
||||
try {
|
||||
token.renew(conf);
|
||||
fail("should not be able to renew a canceled token " + token);
|
||||
} catch (Exception e) {
|
||||
LOG.info("Expected exception when renewing token", e);
|
||||
}
|
||||
}
|
||||
assertTrue("Should have found token kind " + tokenKind + " from "
|
||||
+ tokens, tokenFound);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
};
|
||||
runServerWithZooKeeper(true, true, c, 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set or unset the key provider configuration based on token kind.
|
||||
*/
|
||||
private void setupConfForToken(Text tokenKind, Configuration conf,
|
||||
String lbUri) {
|
||||
if (tokenKind.equals(TOKEN_KIND)) {
|
||||
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
} else {
|
||||
// conf is only required for legacy tokens to create provider,
|
||||
// new tokens create provider by parsing its own Service field
|
||||
assertEquals(TOKEN_LEGACY_KIND, tokenKind);
|
||||
conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProxyUserKerb() throws Exception {
|
||||
@ -2558,6 +2909,16 @@ public void testWebHDFSProxyUserSimple() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testTGTRenewal() throws Exception {
|
||||
shutdownMiniKdc();
|
||||
try {
|
||||
testTgtRenewalInt();
|
||||
} finally {
|
||||
shutdownMiniKdc();
|
||||
setUpMiniKdc();
|
||||
}
|
||||
}
|
||||
|
||||
private void testTgtRenewalInt() throws Exception {
|
||||
Properties kdcConf = MiniKdc.createConf();
|
||||
kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3");
|
||||
kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");
|
||||
|
Loading…
Reference in New Issue
Block a user