From 8ebf2e95d2053cb94c6ff87ca018811fe8276f2b Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Thu, 28 Jul 2016 18:23:51 -0700 Subject: [PATCH] HADOOP-13381. KMS clients should use KMS Delegation Tokens from current UGI. Contributed by Xiao Chen. --- .../crypto/key/kms/KMSClientProvider.java | 23 +++- .../hadoop/crypto/key/kms/server/TestKMS.java | 125 ++++++++++++++++++ 2 files changed, 146 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 7e06ddd40a..47549f7ee9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -38,6 +38,7 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; @@ -536,8 +537,12 @@ private HttpURLConnection createConnection(final URL url, String method) UserGroupInformation.AuthenticationMethod.PROXY) ? currentUgi.getShortUserName() : null; - // creating the HTTP connection using the current UGI at constructor time - conn = actualUgi.doAs(new PrivilegedExceptionAction() { + // If current UGI contains kms-dt && is not proxy, doAs it to use its dt. + // Otherwise, create the HTTP connection using the UGI at constructor time + UserGroupInformation ugiToUse = + (currentUgiContainsKmsDt() && doAsUser == null) ? + currentUgi : actualUgi; + conn = ugiToUse.doAs(new PrivilegedExceptionAction() { @Override public HttpURLConnection run() throws Exception { DelegationTokenAuthenticatedURL authUrl = @@ -1041,6 +1046,20 @@ private Text getDelegationTokenService() throws IOException { return dtService; } + private boolean currentUgiContainsKmsDt() throws IOException { + // Add existing credentials from current UGI, since provider is cached. + Credentials creds = UserGroupInformation.getCurrentUser(). + getCredentials(); + if (!creds.getAllTokens().isEmpty()) { + org.apache.hadoop.security.token.Token + dToken = creds.getToken(getDelegationTokenService()); + if (dToken != null) { + return true; + } + } + return false; + } + /** * Shutdown valueQueue executor threads */ diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java index e3b30a121a..61b9a90b1b 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java +++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java @@ -38,7 +38,9 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -50,6 +52,8 @@ import javax.security.auth.login.AppConfigurationEntry; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -62,8 +66,10 @@ import java.net.URL; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -1876,6 +1882,125 @@ public Void run() throws Exception { }); } + @Test + public void testDelegationTokensUpdatedInUGI() throws Exception { + Configuration conf = new Configuration(); + UserGroupInformation.setConfiguration(conf); + File confDir = getTestDir(); + conf = createBaseKMSConf(confDir); + conf.set( + "hadoop.kms.authentication.delegation-token.max-lifetime.sec", "5"); + conf.set( + "hadoop.kms.authentication.delegation-token.renew-interval.sec", "5"); + writeConf(confDir, conf); + + // Running as a service (e.g. Yarn in practice). + runServer(null, null, confDir, new KMSCallable() { + @Override + public Void call() throws Exception { + final Configuration clientConf = new Configuration(); + final URI uri = createKMSUri(getKMSUrl()); + clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, + createKMSUri(getKMSUrl()).toString()); + final KeyProvider kp = createProvider(uri, clientConf); + final KeyProviderDelegationTokenExtension kpdte = + KeyProviderDelegationTokenExtension. + createKeyProviderDelegationTokenExtension(kp); + final InetSocketAddress kmsAddr = + new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort()); + + // Job 1 (e.g. Yarn log aggregation job), with user DT. + final Collection> job1Token = new HashSet<>(); + doAs("client", new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // Get a DT and use it. + final Credentials credentials = new Credentials(); + kpdte.addDelegationTokens("client", credentials); + Assert.assertEquals(1, credentials.getAllTokens().size()); + Assert.assertEquals(KMSClientProvider.TOKEN_KIND, credentials. + getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind()); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + LOG.info("Added kms dt to credentials: {}", UserGroupInformation. + getCurrentUser().getCredentials().getAllTokens()); + Token token = + UserGroupInformation.getCurrentUser().getCredentials() + .getToken(SecurityUtil.buildTokenService(kmsAddr)); + Assert.assertNotNull(token); + job1Token.add(token); + + // Decode the token to get max time. + ByteArrayInputStream buf = + new ByteArrayInputStream(token.getIdentifier()); + DataInputStream dis = new DataInputStream(buf); + DelegationTokenIdentifier id = + new DelegationTokenIdentifier(token.getKind()); + id.readFields(dis); + dis.close(); + final long maxTime = id.getMaxDate(); + + // wait for token to expire. + Thread.sleep(5100); + Assert.assertTrue("maxTime " + maxTime + " is not less than now.", + maxTime > 0 && maxTime < Time.now()); + try { + kp.getKeys(); + Assert.fail("Operation should fail since dt is expired."); + } catch (Exception e) { + LOG.info("Expected error.", e); + } + return null; + } + }); + Assert.assertFalse(job1Token.isEmpty()); + + // job 2 (e.g. Another Yarn log aggregation job, with user DT. + doAs("client", new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // Get a new DT, but don't use it yet. + final Credentials newCreds = new Credentials(); + kpdte.addDelegationTokens("client", newCreds); + Assert.assertEquals(1, newCreds.getAllTokens().size()); + Assert.assertEquals(KMSClientProvider.TOKEN_KIND, + newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). + getKind()); + + // Using job 1's DT should fail. + final Credentials oldCreds = new Credentials(); + for (Token token : job1Token) { + if (token.getKind().equals(KMSClientProvider.TOKEN_KIND)) { + oldCreds + .addToken(SecurityUtil.buildTokenService(kmsAddr), token); + } + } + UserGroupInformation.getCurrentUser().addCredentials(oldCreds); + LOG.info("Added old kms dt to credentials: {}", UserGroupInformation + .getCurrentUser().getCredentials().getAllTokens()); + try { + kp.getKeys(); + Assert.fail("Operation should fail since dt is expired."); + } catch (Exception e) { + LOG.info("Expected error.", e); + } + + // Using the new DT should succeed. + Assert.assertEquals(1, newCreds.getAllTokens().size()); + Assert.assertEquals(KMSClientProvider.TOKEN_KIND, + newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). + getKind()); + UserGroupInformation.getCurrentUser().addCredentials(newCreds); + LOG.info("Credetials now are: {}", UserGroupInformation + .getCurrentUser().getCredentials().getAllTokens()); + kp.getKeys(); + return null; + } + }); + return null; + } + }); + } + @Test public void testKMSWithZKSigner() throws Exception { doKMSWithZK(true, false);