HADOOP-11151. Automatically refresh auth token and retry on auth failure. Contributed by Arun Suresh.
This commit is contained in:
parent
054f285526
commit
2d8e6e2c4a
@ -250,7 +250,10 @@ public static void injectToken(HttpURLConnection conn, Token token) {
|
||||
* @throws AuthenticationException if an authentication exception occurred.
|
||||
*/
|
||||
public static void extractToken(HttpURLConnection conn, Token token) throws IOException, AuthenticationException {
|
||||
if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
|
||||
int respCode = conn.getResponseCode();
|
||||
if (respCode == HttpURLConnection.HTTP_OK
|
||||
|| respCode == HttpURLConnection.HTTP_CREATED
|
||||
|| respCode == HttpURLConnection.HTTP_ACCEPTED) {
|
||||
Map<String, List<String>> headers = conn.getHeaderFields();
|
||||
List<String> cookies = headers.get("Set-Cookie");
|
||||
if (cookies != null) {
|
||||
|
@ -786,6 +786,9 @@ Release 2.6.0 - UNRELEASED
|
||||
HADOOP-11160. Fix typo in nfs3 server duplicate entry reporting.
|
||||
(Charles Lamb via wheat9)
|
||||
|
||||
HADOOP-11151. Automatically refresh auth token and retry on auth failure.
|
||||
(Arun Suresh via wang)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HADOOP-10734. Implement high-performance secure random number sources.
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
@ -76,6 +77,8 @@
|
||||
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
|
||||
|
||||
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
|
||||
|
||||
public static final String TOKEN_KIND = "kms-dt";
|
||||
|
||||
public static final String SCHEME_NAME = "kms";
|
||||
@ -97,6 +100,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||
public static final String TIMEOUT_ATTR = CONFIG_PREFIX + "timeout";
|
||||
public static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
/* Number of times to retry authentication in the event of auth failure
|
||||
* (normally happens due to stale authToken)
|
||||
*/
|
||||
public static final String AUTH_RETRY = CONFIG_PREFIX
|
||||
+ "authentication.retry-count";
|
||||
public static final int DEFAULT_AUTH_RETRY = 1;
|
||||
|
||||
private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
|
||||
|
||||
private class EncryptedQueueRefiller implements
|
||||
@ -238,6 +248,7 @@ public static String checkNotEmpty(String s, String name)
|
||||
private ConnectionConfigurator configurator;
|
||||
private DelegationTokenAuthenticatedURL.Token authToken;
|
||||
private UserGroupInformation loginUgi;
|
||||
private final int authRetry;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@ -296,6 +307,7 @@ public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||
}
|
||||
}
|
||||
int timeout = conf.getInt(TIMEOUT_ATTR, DEFAULT_TIMEOUT);
|
||||
authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
|
||||
configurator = new TimeoutConnConfigurator(timeout, sslFactory);
|
||||
encKeyVersionQueue =
|
||||
new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
|
||||
@ -416,7 +428,12 @@ public HttpURLConnection run() throws Exception {
|
||||
}
|
||||
|
||||
private <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
int expectedResponse, Class<T> klass)
|
||||
int expectedResponse, Class<T> klass) throws IOException {
|
||||
return call(conn, jsonOutput, expectedResponse, klass, authRetry);
|
||||
}
|
||||
|
||||
private <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
int expectedResponse, Class<T> klass, int authRetryCount)
|
||||
throws IOException {
|
||||
T ret = null;
|
||||
try {
|
||||
@ -427,13 +444,36 @@ private <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
conn.getInputStream().close();
|
||||
throw ex;
|
||||
}
|
||||
if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
|
||||
if ((conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN
|
||||
&& conn.getResponseMessage().equals(ANONYMOUS_REQUESTS_DISALLOWED))
|
||||
|| conn.getResponseCode() == HttpURLConnection.HTTP_UNAUTHORIZED) {
|
||||
// Ideally, this should happen only when there is an Authentication
|
||||
// failure. Unfortunately, the AuthenticationFilter returns 403 when it
|
||||
// cannot authenticate (Since a 401 requires Server to send
|
||||
// WWW-Authenticate header as well)..
|
||||
KMSClientProvider.this.authToken =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
KMSClientProvider.this.loginUgi =
|
||||
UserGroupInformation.getCurrentUser();
|
||||
if (authRetryCount > 0) {
|
||||
String contentType = conn.getRequestProperty(CONTENT_TYPE);
|
||||
String requestMethod = conn.getRequestMethod();
|
||||
URL url = conn.getURL();
|
||||
conn = createConnection(url, requestMethod);
|
||||
conn.setRequestProperty(CONTENT_TYPE, contentType);
|
||||
return call(conn, jsonOutput, expectedResponse, klass,
|
||||
authRetryCount - 1);
|
||||
}
|
||||
}
|
||||
try {
|
||||
AuthenticatedURL.extractToken(conn, authToken);
|
||||
} catch (AuthenticationException e) {
|
||||
// Ignore the AuthExceptions.. since we are just using the method to
|
||||
// extract and set the authToken.. (Workaround till we actually fix
|
||||
// AuthenticatedURL properly to set authToken post initialization)
|
||||
} finally {
|
||||
KMSClientProvider.this.loginUgi =
|
||||
UserGroupInformation.getCurrentUser();
|
||||
}
|
||||
HttpExceptionUtils.validateResponse(conn, expectedResponse);
|
||||
if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())
|
||||
|
@ -1606,6 +1606,13 @@ for ldap providers in the same way as above does.
|
||||
</property>
|
||||
|
||||
<!--- KMSClientProvider configurations -->
|
||||
<property>
|
||||
<name>hadoop.security.kms.client.authentication.retry-count</name>
|
||||
<value>1</value>
|
||||
<description>
|
||||
Number of time to retry connecting to KMS on authentication failure
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hadoop.security.kms.client.encrypted.key.cache.size</name>
|
||||
<value>500</value>
|
||||
|
@ -32,8 +32,10 @@
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -49,6 +51,8 @@
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
@ -791,12 +795,24 @@ public Void run() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSRestart() throws Exception {
|
||||
public void testKMSRestartKerberosAuth() throws Exception {
|
||||
doKMSRestart(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSRestartSimpleAuth() throws Exception {
|
||||
doKMSRestart(false);
|
||||
}
|
||||
|
||||
public void doKMSRestart(boolean useKrb) throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
final File testDir = getTestDir();
|
||||
conf = createBaseKMSConf(testDir);
|
||||
if (useKrb) {
|
||||
conf.set("hadoop.kms.authentication.type", "kerberos");
|
||||
}
|
||||
conf.set("hadoop.kms.authentication.kerberos.keytab",
|
||||
keytab.getAbsolutePath());
|
||||
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
|
||||
@ -855,15 +871,6 @@ public Void call() throws Exception {
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
try {
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
Assert.fail("Should fail first time !!");
|
||||
} catch (IOException e) {
|
||||
String message = e.getMessage();
|
||||
Assert.assertTrue("Should be a 403 error : " + message,
|
||||
message.contains("403"));
|
||||
}
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
retKp.createKey("k3", new byte[16],
|
||||
@ -876,6 +883,106 @@ public Void run() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSAuthFailureRetry() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
final File testDir = getTestDir();
|
||||
conf = createBaseKMSConf(testDir);
|
||||
conf.set("hadoop.kms.authentication.kerberos.keytab",
|
||||
keytab.getAbsolutePath());
|
||||
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
|
||||
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
|
||||
|
||||
for (KMSACLs.Type type : KMSACLs.Type.values()) {
|
||||
conf.set(type.getAclConfigKey(), type.toString());
|
||||
}
|
||||
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),
|
||||
KMSACLs.Type.CREATE.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
|
||||
KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k4.ALL", "*");
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir,
|
||||
new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
makeAuthTokenStale(kp);
|
||||
kp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// Test retry count
|
||||
runServer(null, null, testDir,
|
||||
new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
conf.setInt(KMSClientProvider.AUTH_RETRY, 0);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
kp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
makeAuthTokenStale(kp);
|
||||
try {
|
||||
kp.createKey("k4", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
Assert.fail("Shoud fail since retry count == 0");
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(
|
||||
"HTTP exception must be a 403 : " + e.getMessage(), e
|
||||
.getMessage().contains("403"));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void makeAuthTokenStale(KMSClientProvider kp) throws Exception {
|
||||
Field tokF = KMSClientProvider.class.getDeclaredField("authToken");
|
||||
tokF.setAccessible(true);
|
||||
DelegationTokenAuthenticatedURL.Token delToken =
|
||||
(DelegationTokenAuthenticatedURL.Token) tokF.get(kp);
|
||||
String oldTokStr = delToken.toString();
|
||||
Method setM =
|
||||
AuthenticatedURL.Token.class.getDeclaredMethod("set", String.class);
|
||||
setM.setAccessible(true);
|
||||
String newTokStr = oldTokStr.replaceAll("e=[^&]*", "e=1000");
|
||||
setM.invoke(((AuthenticatedURL.Token)delToken), newTokStr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testACLs() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
Loading…
Reference in New Issue
Block a user