HADOOP-11113. Namenode not able to reconnect to KMS after KMS restart. (Arun Suresh via wang)
This commit is contained in:
parent
bbff96be48
commit
a4c9b80a7c
@ -780,6 +780,9 @@ Release 2.6.0 - UNRELEASED
|
||||
|
||||
HADOOP-11130. NFS updateMaps OS check is reversed (brandonli)
|
||||
|
||||
HADOOP-11113. Namenode not able to reconnect to KMS after KMS restart.
|
||||
(Arun Suresh via wang)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HADOOP-10734. Implement high-performance secure random number sources.
|
||||
|
@ -415,7 +415,7 @@ public HttpURLConnection run() throws Exception {
|
||||
return conn;
|
||||
}
|
||||
|
||||
private static <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
private <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
int expectedResponse, Class<T> klass)
|
||||
throws IOException {
|
||||
T ret = null;
|
||||
@ -427,6 +427,14 @@ private static <T> T call(HttpURLConnection conn, Map jsonOutput,
|
||||
conn.getInputStream().close();
|
||||
throw ex;
|
||||
}
|
||||
if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
|
||||
// Ideally, this should happen only when there is an Authentication
|
||||
// failure. Unfortunately, the AuthenticationFilter returns 403 when it
|
||||
// cannot authenticate (Since a 401 requires Server to send
|
||||
// WWW-Authenticate header as well)..
|
||||
KMSClientProvider.this.authToken =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
}
|
||||
HttpExceptionUtils.validateResponse(conn, expectedResponse);
|
||||
if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())
|
||||
&& klass != null) {
|
||||
|
@ -43,12 +43,12 @@
|
||||
|
||||
public class MiniKMS {
|
||||
|
||||
private static Server createJettyServer(String keyStore, String password) {
|
||||
private static Server createJettyServer(String keyStore, String password, int inPort) {
|
||||
try {
|
||||
boolean ssl = keyStore != null;
|
||||
InetAddress localhost = InetAddress.getByName("localhost");
|
||||
String host = "localhost";
|
||||
ServerSocket ss = new ServerSocket(0, 50, localhost);
|
||||
ServerSocket ss = new ServerSocket((inPort < 0) ? 0 : inPort, 50, localhost);
|
||||
int port = ss.getLocalPort();
|
||||
ss.close();
|
||||
Server server = new Server(0);
|
||||
@ -91,6 +91,7 @@ public static class Builder {
|
||||
private String log4jConfFile;
|
||||
private File keyStoreFile;
|
||||
private String keyStorePassword;
|
||||
private int inPort = -1;
|
||||
|
||||
public Builder() {
|
||||
kmsConfDir = new File("target/test-classes").getAbsoluteFile();
|
||||
@ -111,6 +112,12 @@ public Builder setLog4jConfFile(String log4jConfFile) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setPort(int port) {
|
||||
Preconditions.checkArgument(port > 0, "input port must be greater than 0");
|
||||
this.inPort = port;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setSslConf(File keyStoreFile, String keyStorePassword) {
|
||||
Preconditions.checkNotNull(keyStoreFile, "keystore file is NULL");
|
||||
Preconditions.checkNotNull(keyStorePassword, "keystore password is NULL");
|
||||
@ -126,7 +133,7 @@ public MiniKMS build() {
|
||||
"KMS conf dir does not exist");
|
||||
return new MiniKMS(kmsConfDir.getAbsolutePath(), log4jConfFile,
|
||||
(keyStoreFile != null) ? keyStoreFile.getAbsolutePath() : null,
|
||||
keyStorePassword);
|
||||
keyStorePassword, inPort);
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,14 +142,16 @@ public MiniKMS build() {
|
||||
private String keyStore;
|
||||
private String keyStorePassword;
|
||||
private Server jetty;
|
||||
private int inPort;
|
||||
private URL kmsURL;
|
||||
|
||||
public MiniKMS(String kmsConfDir, String log4ConfFile, String keyStore,
|
||||
String password) {
|
||||
String password, int inPort) {
|
||||
this.kmsConfDir = kmsConfDir;
|
||||
this.log4jConfFile = log4ConfFile;
|
||||
this.keyStore = keyStore;
|
||||
this.keyStorePassword = password;
|
||||
this.inPort = inPort;
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
@ -174,7 +183,7 @@ public void start() throws Exception {
|
||||
writer.close();
|
||||
}
|
||||
System.setProperty("log4j.configuration", log4jConfFile);
|
||||
jetty = createJettyServer(keyStore, keyStorePassword);
|
||||
jetty = createJettyServer(keyStore, keyStorePassword, inPort);
|
||||
|
||||
// we need to do a special handling for MiniKMS to work when in a dir and
|
||||
// when in a JAR in the classpath thanks to Jetty way of handling of webapps
|
||||
|
@ -89,7 +89,7 @@ public static File getTestDir() throws Exception {
|
||||
return file;
|
||||
}
|
||||
|
||||
public static abstract class KMSCallable implements Callable<Void> {
|
||||
public static abstract class KMSCallable<T> implements Callable<T> {
|
||||
private URL kmsUrl;
|
||||
|
||||
protected URL getKMSUrl() {
|
||||
@ -97,19 +97,27 @@ protected URL getKMSUrl() {
|
||||
}
|
||||
}
|
||||
|
||||
protected void runServer(String keystore, String password, File confDir,
|
||||
KMSCallable callable) throws Exception {
|
||||
protected <T> T runServer(String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
return runServer(-1, keystore, password, confDir, callable);
|
||||
}
|
||||
|
||||
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
||||
KMSCallable<T> callable) throws Exception {
|
||||
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
||||
.setLog4jConfFile("log4j.properties");
|
||||
if (keystore != null) {
|
||||
miniKMSBuilder.setSslConf(new File(keystore), password);
|
||||
}
|
||||
if (port > 0) {
|
||||
miniKMSBuilder.setPort(port);
|
||||
}
|
||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||
miniKMS.start();
|
||||
try {
|
||||
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||
callable.kmsUrl = miniKMS.getKMSUrl();
|
||||
callable.call();
|
||||
return callable.call();
|
||||
} finally {
|
||||
miniKMS.stop();
|
||||
}
|
||||
@ -284,7 +292,7 @@ public void testStartStop(final boolean ssl, final boolean kerberos)
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(keystore, password, testDir, new KMSCallable() {
|
||||
runServer(keystore, password, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -351,7 +359,7 @@ public void testKMSProvider() throws Exception {
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k6.ALL", "*");
|
||||
writeConf(confDir, conf);
|
||||
|
||||
runServer(null, null, confDir, new KMSCallable() {
|
||||
runServer(null, null, confDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Date started = new Date();
|
||||
@ -616,7 +624,7 @@ public void testKeyACLs() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
@ -782,6 +790,92 @@ public Void run() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSRestart() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
final File testDir = getTestDir();
|
||||
conf = createBaseKMSConf(testDir);
|
||||
conf.set("hadoop.kms.authentication.kerberos.keytab",
|
||||
keytab.getAbsolutePath());
|
||||
conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
|
||||
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
|
||||
|
||||
for (KMSACLs.Type type : KMSACLs.Type.values()) {
|
||||
conf.set(type.getAclConfigKey(), type.toString());
|
||||
}
|
||||
conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),
|
||||
KMSACLs.Type.CREATE.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
|
||||
KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
|
||||
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.ALL", "*");
|
||||
conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
KMSCallable<KeyProvider> c =
|
||||
new KMSCallable<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
final URI uri = createKMSUri(getKMSUrl());
|
||||
|
||||
final KeyProvider kp =
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<KeyProvider>() {
|
||||
@Override
|
||||
public KeyProvider run() throws Exception {
|
||||
KMSClientProvider kp = new KMSClientProvider(uri, conf);
|
||||
kp.createKey("k1", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return kp;
|
||||
}
|
||||
});
|
||||
return kp;
|
||||
}
|
||||
};
|
||||
|
||||
final KeyProvider retKp =
|
||||
runServer(null, null, testDir, c);
|
||||
|
||||
// Restart server (using the same port)
|
||||
runServer(c.getKMSUrl().getPort(), null, null, testDir,
|
||||
new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||
doAs("SET_KEY_MATERIAL",
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
try {
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
Assert.fail("Should fail first time !!");
|
||||
} catch (IOException e) {
|
||||
String message = e.getMessage();
|
||||
Assert.assertTrue("Should be a 403 error : " + message,
|
||||
message.contains("403"));
|
||||
}
|
||||
retKp.createKey("k2", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
retKp.createKey("k3", new byte[16],
|
||||
new KeyProvider.Options(conf));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testACLs() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
@ -809,7 +903,7 @@ public void testACLs() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1117,7 +1211,7 @@ public void testKMSBlackList() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1201,7 +1295,7 @@ public void testServicePrincipalACLs() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1326,7 +1420,7 @@ public void testDelegationTokenAccess() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
@ -1398,7 +1492,7 @@ public void testProxyUser() throws Exception {
|
||||
|
||||
writeConf(testDir, conf);
|
||||
|
||||
runServer(null, null, testDir, new KMSCallable() {
|
||||
runServer(null, null, testDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
|
Loading…
Reference in New Issue
Block a user