HADOOP-14987. Improve KMSClientProvider log around delegation token checking. Contributed by Xiaoyu Yao and Xiao Chen.

This commit is contained in:
Xiaoyu Yao 2017-11-03 15:58:24 -07:00
parent b85603e3f8
commit 59d78a5088
3 changed files with 82 additions and 28 deletions

View File

@ -133,6 +133,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private static final ObjectWriter WRITER = private static final ObjectWriter WRITER =
new ObjectMapper().writerWithDefaultPrettyPrinter(); new ObjectMapper().writerWithDefaultPrettyPrinter();
private final Text dtService;
// Allow fallback to default kms server port 9600 for certain tests that do
// not specify the port explicitly in the kms provider url.
@VisibleForTesting
public static volatile boolean fallbackDefaultPortForTesting = false;
private class EncryptedQueueRefiller implements private class EncryptedQueueRefiller implements
ValueQueue.QueueRefiller<EncryptedKeyVersion> { ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@ -297,7 +304,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
} }
} }
private String kmsUrl; private URL kmsUrl;
private SSLFactory sslFactory; private SSLFactory sslFactory;
private ConnectionConfigurator configurator; private ConnectionConfigurator configurator;
private DelegationTokenAuthenticatedURL.Token authToken; private DelegationTokenAuthenticatedURL.Token authToken;
@ -349,7 +356,15 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
public KMSClientProvider(URI uri, Configuration conf) throws IOException { public KMSClientProvider(URI uri, Configuration conf) throws IOException {
super(conf); super(conf);
kmsUrl = createServiceURL(extractKMSPath(uri)); kmsUrl = createServiceURL(extractKMSPath(uri));
if ("https".equalsIgnoreCase(new URL(kmsUrl).getProtocol())) { int kmsPort = kmsUrl.getPort();
if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
kmsPort = 9600;
}
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
dtService = SecurityUtil.buildTokenService(addr);
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try { try {
sslFactory.init(); sslFactory.init();
@ -385,19 +400,20 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
new EncryptedQueueRefiller()); new EncryptedQueueRefiller());
authToken = new DelegationTokenAuthenticatedURL.Token(); authToken = new DelegationTokenAuthenticatedURL.Token();
LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
" created.", kmsUrl, dtService);
} }
private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException { private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
return ProviderUtils.unnestUri(uri); return ProviderUtils.unnestUri(uri);
} }
private static String createServiceURL(Path path) throws IOException { private static URL createServiceURL(Path path) throws IOException {
String str = new URL(path.toString()).toExternalForm(); String str = new URL(path.toString()).toExternalForm();
if (str.endsWith("/")) { if (str.endsWith("/")) {
str = str.substring(0, str.length() - 1); str = str.substring(0, str.length() - 1);
} }
return new URL(str + KMSRESTConstants.SERVICE_VERSION + "/"). return new URL(str + KMSRESTConstants.SERVICE_VERSION + "/");
toExternalForm();
} }
private URL createURL(String collection, String resource, String subResource, private URL createURL(String collection, String resource, String subResource,
@ -996,7 +1012,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
public Token<?>[] addDelegationTokens(final String renewer, public Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException { Credentials credentials) throws IOException {
Token<?>[] tokens = null; Token<?>[] tokens = null;
Text dtService = getDelegationTokenService();
Token<?> token = credentials.getToken(dtService); Token<?> token = credentials.getToken(dtService);
if (token == null) { if (token == null) {
final URL url = createURL(null, null, null, null); final URL url = createURL(null, null, null, null);
@ -1034,20 +1049,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return tokens; return tokens;
} }
private Text getDelegationTokenService() throws IOException {
URL url = new URL(kmsUrl);
InetSocketAddress addr = new InetSocketAddress(url.getHost(),
url.getPort());
Text dtService = SecurityUtil.buildTokenService(addr);
return dtService;
}
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException { private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
// Add existing credentials from the UGI, since provider is cached. // Add existing credentials from the UGI, since provider is cached.
Credentials creds = ugi.getCredentials(); Credentials creds = ugi.getCredentials();
if (!creds.getAllTokens().isEmpty()) { if (!creds.getAllTokens().isEmpty()) {
LOG.debug("Searching for token that matches service: {}", dtService);
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
dToken = creds.getToken(getDelegationTokenService()); dToken = creds.getToken(dtService);
if (dToken != null) { if (dToken != null) {
return true; return true;
} }
@ -1058,9 +1066,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private UserGroupInformation getActualUgi() throws IOException { private UserGroupInformation getActualUgi() throws IOException {
final UserGroupInformation currentUgi = UserGroupInformation final UserGroupInformation currentUgi = UserGroupInformation
.getCurrentUser(); .getCurrentUser();
if (LOG.isDebugEnabled()) {
UserGroupInformation.logAllUserInfo(currentUgi); UserGroupInformation.logAllUserInfo(LOG, currentUgi);
}
// Use current user by default // Use current user by default
UserGroupInformation actualUgi = currentUgi; UserGroupInformation actualUgi = currentUgi;
if (currentUgi.getRealUser() != null) { if (currentUgi.getRealUser() != null) {
@ -1099,6 +1107,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@VisibleForTesting @VisibleForTesting
String getKMSUrl() { String getKMSUrl() {
return kmsUrl; return kmsUrl.toString();
} }
} }

View File

@ -1990,20 +1990,51 @@ public class UserGroupInformation {
} }
} }
public static void logAllUserInfo(UserGroupInformation ugi) throws /**
IOException { * Log current UGI and token information into specified log.
if (LOG.isDebugEnabled()) { * @param ugi - UGI
LOG.debug("UGI: " + ugi); * @throws IOException
if (ugi.getRealUser() != null) { */
LOG.debug("+RealUGI: " + ugi.getRealUser()); @InterfaceAudience.LimitedPrivate({"HDFS", "KMS"})
} @InterfaceStability.Unstable
LOG.debug("+LoginUGI: " + ugi.getLoginUser()); public static void logUserInfo(Logger log, String caption,
UserGroupInformation ugi) throws IOException {
if (log.isDebugEnabled()) {
log.debug(caption + " UGI: " + ugi);
for (Token<?> token : ugi.getTokens()) { for (Token<?> token : ugi.getTokens()) {
LOG.debug("+UGI token:" + token); log.debug("+token:" + token);
} }
} }
} }
/**
* Log all (current, real, login) UGI and token info into specified log.
* @param ugi - UGI
* @throws IOException
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "KMS"})
@InterfaceStability.Unstable
public static void logAllUserInfo(Logger log, UserGroupInformation ugi) throws
IOException {
if (log.isDebugEnabled()) {
logUserInfo(log, "Current", ugi.getCurrentUser());
if (ugi.getRealUser() != null) {
logUserInfo(log, "Real", ugi.getRealUser());
}
logUserInfo(log, "Login", ugi.getLoginUser());
}
}
/**
* Log all (current, real, login) UGI and token info into UGI debug log.
* @param ugi - UGI
* @throws IOException
*/
public static void logAllUserInfo(UserGroupInformation ugi) throws
IOException {
logAllUserInfo(LOG, ugi);
}
private void print() throws IOException { private void print() throws IOException {
System.out.println("User: " + getUserName()); System.out.println("User: " + getUserName());
System.out.print("Group Ids: "); System.out.print("Group Ids: ");

View File

@ -39,8 +39,11 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -48,9 +51,20 @@ import com.google.common.collect.Sets;
public class TestLoadBalancingKMSClientProvider { public class TestLoadBalancingKMSClientProvider {
@BeforeClass
public static void setup() throws IOException {
SecurityUtil.setTokenServiceUseIp(false);
}
@After
public void teardown() throws IOException {
KMSClientProvider.fallbackDefaultPortForTesting = false;
}
@Test @Test
public void testCreation() throws Exception { public void testCreation() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI( KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1/kms/foo"), conf); "kms://http@host1/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider); assertTrue(kp instanceof LoadBalancingKMSClientProvider);
@ -231,6 +245,7 @@ public class TestLoadBalancingKMSClientProvider {
@Test @Test
public void testClassCastException() throws Exception { public void testClassCastException() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KMSClientProvider p1 = new MyKMSClientProvider( KMSClientProvider p1 = new MyKMSClientProvider(
new URI("kms://http@host1/kms/foo"), conf); new URI("kms://http@host1/kms/foo"), conf);
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(