Hadoop 16524 - resubmission following some unit test fixes (#2693)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Borislav Iordanov 2021-03-31 13:07:42 -04:00 committed by GitHub
parent d69088a097
commit 9509bebf7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 715 additions and 197 deletions

View File

@ -27,14 +27,17 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.nio.file.Paths;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.Enumeration;
import java.util.Arrays;
import java.util.Timer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -75,6 +78,8 @@
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
import org.apache.hadoop.security.ssl.FileMonitoringTimerTask;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
@ -186,6 +191,7 @@ public final class HttpServer2 implements FilterContainer {
static final String STATE_DESCRIPTION_ALIVE = " - alive";
static final String STATE_DESCRIPTION_NOT_LIVE = " - not live";
private final SignerSecretProvider secretProvider;
private final Optional<java.util.Timer> configurationChangeMonitor;
private XFrameOption xFrameOption;
private boolean xFrameOptionIsEnabled;
public static final String HTTP_HEADER_PREFIX = "hadoop.http.header.";
@ -244,6 +250,8 @@ public static class Builder {
private boolean sniHostCheckEnabled;
private Optional<Timer> configurationChangeMonitor = Optional.empty();
public Builder setName(String name){
this.name = name;
return this;
@ -574,12 +582,45 @@ private ServerConnector createHttpsChannelConnector(
}
setEnabledProtocols(sslContextFactory);
long storesReloadInterval =
conf.getLong(FileBasedKeyStoresFactory.SSL_STORES_RELOAD_INTERVAL_TPL_KEY,
FileBasedKeyStoresFactory.DEFAULT_SSL_STORES_RELOAD_INTERVAL);
if (storesReloadInterval > 0) {
this.configurationChangeMonitor = Optional.of(
this.makeConfigurationChangeMonitor(storesReloadInterval, sslContextFactory));
}
conn.addFirstConnectionFactory(new SslConnectionFactory(sslContextFactory,
HttpVersion.HTTP_1_1.asString()));
return conn;
}
private Timer makeConfigurationChangeMonitor(long reloadInterval,
SslContextFactory.Server sslContextFactory) {
java.util.Timer timer = new java.util.Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
//
// The Jetty SSLContextFactory provides a 'reload' method which will reload both
// truststore and keystore certificates.
//
timer.schedule(new FileMonitoringTimerTask(
Paths.get(keyStore),
path -> {
LOG.info("Reloading certificates from store keystore " + keyStore);
try {
sslContextFactory.reload(factory -> { });
} catch (Exception ex) {
LOG.error("Failed to reload SSL keystore certificates", ex);
}
},null),
reloadInterval,
reloadInterval
);
return timer;
}
private void setEnabledProtocols(SslContextFactory sslContextFactory) {
String enabledProtocols = conf.get(SSLFactory.SSL_ENABLED_PROTOCOLS_KEY,
SSLFactory.SSL_ENABLED_PROTOCOLS_DEFAULT);
@ -622,6 +663,7 @@ private HttpServer2(final Builder b) throws IOException {
this.webAppContext = createWebAppContext(b, adminsAcl, appDir);
this.xFrameOptionIsEnabled = b.xFrameEnabled;
this.xFrameOption = b.xFrameOption;
this.configurationChangeMonitor = b.configurationChangeMonitor;
try {
this.secretProvider =
@ -1420,6 +1462,16 @@ void openListeners() throws Exception {
*/
public void stop() throws Exception {
MultiException exception = null;
if (this.configurationChangeMonitor.isPresent()) {
try {
this.configurationChangeMonitor.get().cancel();
} catch (Exception e) {
LOG.error(
"Error while canceling configuration monitoring timer for webapp"
+ webAppContext.getDisplayName(), e);
exception = addMultiException(exception, e);
}
}
for (ServerConnector c : listeners) {
try {
c.close();

View File

@ -29,20 +29,20 @@
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.text.MessageFormat;
import java.util.Timer;
/**
* {@link KeyStoresFactory} implementation that reads the certificates from
* keystore files.
* <p>
* if the trust certificates keystore file changes, the {@link TrustManager}
* is refreshed with the new trust certificate entries (using a
* {@link ReloadingX509TrustManager} trustmanager).
* If either the truststore or the keystore certificates file changes, it
* would be refreshed under the corresponding wrapper implementation -
* {@link ReloadingX509KeystoreManager} or {@link ReloadingX509TrustManager}.
* </p>
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -51,6 +51,19 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
private static final Logger LOG =
LoggerFactory.getLogger(FileBasedKeyStoresFactory.class);
/**
* The name of the timer thread monitoring file changes.
*/
public static final String SSL_MONITORING_THREAD_NAME = "SSL Certificates Store Monitor";
/**
* The refresh interval used to check if either of the truststore or keystore
* certificate file has changed.
*/
public static final String SSL_STORES_RELOAD_INTERVAL_TPL_KEY =
"ssl.{0}.stores.reload.interval";
public static final String SSL_KEYSTORE_LOCATION_TPL_KEY =
"ssl.{0}.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY =
@ -77,14 +90,119 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory {
public static final String DEFAULT_KEYSTORE_TYPE = "jks";
/**
* Reload interval in milliseconds.
* The default time interval in milliseconds used to check if either
* of the truststore or keystore certificates file has changed and needs reloading.
*/
public static final int DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL = 10000;
public static final int DEFAULT_SSL_STORES_RELOAD_INTERVAL = 10000;
private Configuration conf;
private KeyManager[] keyManagers;
private TrustManager[] trustManagers;
private ReloadingX509TrustManager trustManager;
private Timer fileMonitoringTimer;
private void createTrustManagersFromConfiguration(SSLFactory.Mode mode,
String truststoreType,
String truststoreLocation,
long storesReloadInterval)
throws IOException, GeneralSecurityException {
String passwordProperty = resolvePropertyName(mode,
SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
String truststorePassword = getPassword(conf, passwordProperty, "");
if (truststorePassword.isEmpty()) {
// An empty trust store password is legal; the trust store password
// is only required when writing to a trust store. Otherwise it's
// an optional integrity check.
truststorePassword = null;
}
// Check if obsolete truststore specific reload interval is present for backward compatible
long truststoreReloadInterval =
conf.getLong(
resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
storesReloadInterval);
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation +
", reloading at " + truststoreReloadInterval + " millis.");
}
trustManager = new ReloadingX509TrustManager(
truststoreType,
truststoreLocation,
truststorePassword);
if (truststoreReloadInterval > 0) {
fileMonitoringTimer.schedule(
new FileMonitoringTimerTask(
Paths.get(truststoreLocation),
path -> trustManager.loadFrom(path),
exception -> LOG.error(ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE, exception)),
truststoreReloadInterval,
truststoreReloadInterval);
}
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
}
trustManagers = new TrustManager[]{trustManager};
}
/**
* Implements logic of initializing the KeyManagers with the options
* to reload keystores.
* @param mode client or server
* @param keystoreType The keystore type.
* @param storesReloadInterval The interval to check if the keystore certificates
* file has changed.
*/
private void createKeyManagersFromConfiguration(SSLFactory.Mode mode,
String keystoreType, long storesReloadInterval)
throws GeneralSecurityException, IOException {
String locationProperty =
resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
String keystoreLocation = conf.get(locationProperty, "");
if (keystoreLocation.isEmpty()) {
throw new GeneralSecurityException("The property '" + locationProperty +
"' has not been set in the ssl configuration file.");
}
String passwordProperty =
resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
String keystorePassword = getPassword(conf, passwordProperty, "");
if (keystorePassword.isEmpty()) {
throw new GeneralSecurityException("The property '" + passwordProperty +
"' has not been set in the ssl configuration file.");
}
String keyPasswordProperty =
resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
// Key password defaults to the same value as store password for
// compatibility with legacy configurations that did not use a separate
// configuration property for key password.
String keystoreKeyPassword = getPassword(
conf, keyPasswordProperty, keystorePassword);
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
}
ReloadingX509KeystoreManager keystoreManager = new ReloadingX509KeystoreManager(
keystoreType,
keystoreLocation,
keystorePassword,
keystoreKeyPassword);
if (storesReloadInterval > 0) {
fileMonitoringTimer.schedule(
new FileMonitoringTimerTask(
Paths.get(keystoreLocation),
path -> keystoreManager.loadFrom(path),
exception -> LOG.error(ReloadingX509KeystoreManager.RELOAD_ERROR_MESSAGE, exception)),
storesReloadInterval,
storesReloadInterval);
}
keyManagers = new KeyManager[] { keystoreManager };
}
/**
* Resolves a property name to its client/server version if applicable.
@ -139,56 +257,28 @@ public void init(SSLFactory.Mode mode)
conf.getBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY,
SSLFactory.SSL_REQUIRE_CLIENT_CERT_DEFAULT);
long storesReloadInterval = conf.getLong(
resolvePropertyName(mode, SSL_STORES_RELOAD_INTERVAL_TPL_KEY),
DEFAULT_SSL_STORES_RELOAD_INTERVAL);
fileMonitoringTimer = new Timer(SSL_MONITORING_THREAD_NAME, true);
// certificate store
String keystoreType =
conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
DEFAULT_KEYSTORE_TYPE);
KeyStore keystore = KeyStore.getInstance(keystoreType);
String keystoreKeyPassword = null;
if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
String locationProperty =
resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
String keystoreLocation = conf.get(locationProperty, "");
if (keystoreLocation.isEmpty()) {
throw new GeneralSecurityException("The property '" + locationProperty +
"' has not been set in the ssl configuration file.");
}
String passwordProperty =
resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
String keystorePassword = getPassword(conf, passwordProperty, "");
if (keystorePassword.isEmpty()) {
throw new GeneralSecurityException("The property '" + passwordProperty +
"' has not been set in the ssl configuration file.");
}
String keyPasswordProperty =
resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY);
// Key password defaults to the same value as store password for
// compatibility with legacy configurations that did not use a separate
// configuration property for key password.
keystoreKeyPassword = getPassword(
conf, keyPasswordProperty, keystorePassword);
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
}
conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
DEFAULT_KEYSTORE_TYPE);
InputStream is = Files.newInputStream(Paths.get(keystoreLocation));
try {
keystore.load(is, keystorePassword.toCharArray());
} finally {
is.close();
}
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " Loaded KeyStore: " + keystoreLocation);
}
if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
createKeyManagersFromConfiguration(mode, keystoreType, storesReloadInterval);
} else {
KeyStore keystore = KeyStore.getInstance(keystoreType);
keystore.load(null, null);
KeyManagerFactory keyMgrFactory = KeyManagerFactory
.getInstance(SSLFactory.SSLCERTIFICATE);
keyMgrFactory.init(keystore, null);
keyManagers = keyMgrFactory.getKeyManagers();
}
KeyManagerFactory keyMgrFactory = KeyManagerFactory
.getInstance(SSLFactory.SSLCERTIFICATE);
keyMgrFactory.init(keystore, (keystoreKeyPassword != null) ?
keystoreKeyPassword.toCharArray() : null);
keyManagers = keyMgrFactory.getKeyManagers();
//trust store
String truststoreType =
@ -199,33 +289,7 @@ public void init(SSLFactory.Mode mode)
resolvePropertyName(mode, SSL_TRUSTSTORE_LOCATION_TPL_KEY);
String truststoreLocation = conf.get(locationProperty, "");
if (!truststoreLocation.isEmpty()) {
String passwordProperty = resolvePropertyName(mode,
SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
String truststorePassword = getPassword(conf, passwordProperty, "");
if (truststorePassword.isEmpty()) {
// An empty trust store password is legal; the trust store password
// is only required when writing to a trust store. Otherwise it's
// an optional integrity check.
truststorePassword = null;
}
long truststoreReloadInterval =
conf.getLong(
resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL);
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation);
}
trustManager = new ReloadingX509TrustManager(truststoreType,
truststoreLocation,
truststorePassword,
truststoreReloadInterval);
trustManager.init();
if (LOG.isDebugEnabled()) {
LOG.debug(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
}
trustManagers = new TrustManager[]{trustManager};
createTrustManagersFromConfiguration(mode, truststoreType, truststoreLocation, storesReloadInterval);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("The property '" + locationProperty + "' has not been set, " +
@ -256,7 +320,7 @@ String getPassword(Configuration conf, String alias, String defaultPass) {
@Override
public synchronized void destroy() {
if (trustManager != null) {
trustManager.destroy();
fileMonitoringTimer.cancel();
trustManager = null;
keyManagers = null;
trustManagers = null;

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.TimerTask;
import java.util.function.Consumer;
/**
* Implements basic logic to track when a file changes on disk and call the action
* passed to the constructor when it does. An exception handler can optionally also be specified
* in the constructor, otherwise any exception occurring during process will be logged
* using this class' logger.
*/
@InterfaceAudience.Private
public class FileMonitoringTimerTask extends TimerTask {
static final Logger LOG = LoggerFactory.getLogger(FileMonitoringTimerTask.class);
@VisibleForTesting
static final String PROCESS_ERROR_MESSAGE =
"Could not process file change : ";
final private Path filePath;
final private Consumer<Path> onFileChange;
final Consumer<Throwable> onChangeFailure;
private long lastProcessed;
/**
* Create file monitoring task to be scheduled using a standard Java {@link java.util.Timer}
* instance.
*
* @param filePath The path to the file to monitor.
* @param onFileChange The function to call when the file has changed.
* @param onChangeFailure The function to call when an exception is thrown during the
* file change processing.
*/
public FileMonitoringTimerTask(Path filePath, Consumer<Path> onFileChange,
Consumer<Throwable> onChangeFailure) {
Preconditions.checkNotNull(filePath, "path to monitor disk file is not set");
Preconditions.checkNotNull(onFileChange, "action to monitor disk file is not set");
this.filePath = filePath;
this.lastProcessed = filePath.toFile().lastModified();
this.onFileChange = onFileChange;
this.onChangeFailure = onChangeFailure;
}
@Override
public void run() {
if (lastProcessed != filePath.toFile().lastModified()) {
try {
onFileChange.accept(filePath);
} catch (Throwable t) {
if (onChangeFailure != null) {
onChangeFailure.accept(t);
} else {
LOG.error(PROCESS_ERROR_MESSAGE + filePath.toString(), t);
}
}
lastProcessed = filePath.toFile().lastModified();
}
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.*;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.Principal;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicReference;
/**
* An implementation of <code>X509KeyManager</code> that exposes a method,
* {@link #loadFrom(Path)} to reload its configuration. Note that it is necessary
* to implement the <code>X509ExtendedKeyManager</code> to properly delegate
* the additional methods, otherwise the SSL handshake will fail.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReloadingX509KeystoreManager extends X509ExtendedKeyManager {
private static final Logger LOG = LoggerFactory.getLogger(ReloadingX509TrustManager.class);
static final String RELOAD_ERROR_MESSAGE =
"Could not load keystore (keep using existing one) : ";
final private String type;
final private String storePassword;
final private String keyPassword;
private AtomicReference<X509ExtendedKeyManager> keyManagerRef;
/**
* Construct a <code>Reloading509KeystoreManager</code>
*
* @param type type of keystore file, typically 'jks'.
* @param location local path to the keystore file.
* @param storePassword password of the keystore file.
* @param keyPassword The password of the key.
* @throws IOException
* @throws GeneralSecurityException
*/
public ReloadingX509KeystoreManager(String type, String location,
String storePassword, String keyPassword)
throws IOException, GeneralSecurityException {
this.type = type;
this.storePassword = storePassword;
this.keyPassword = keyPassword;
keyManagerRef = new AtomicReference<X509ExtendedKeyManager>();
keyManagerRef.set(loadKeyManager(Paths.get(location)));
}
@Override
public String chooseEngineClientAlias(String[] strings, Principal[] principals,
SSLEngine sslEngine) {
return keyManagerRef.get().chooseEngineClientAlias(strings, principals, sslEngine);
}
@Override
public String chooseEngineServerAlias(String s, Principal[] principals,
SSLEngine sslEngine) {
return keyManagerRef.get().chooseEngineServerAlias(s, principals, sslEngine);
}
@Override
public String[] getClientAliases(String s, Principal[] principals) {
return keyManagerRef.get().getClientAliases(s, principals);
}
@Override
public String chooseClientAlias(String[] strings, Principal[] principals,
Socket socket) {
return keyManagerRef.get().chooseClientAlias(strings, principals, socket);
}
@Override
public String[] getServerAliases(String s, Principal[] principals) {
return keyManagerRef.get().getServerAliases(s, principals);
}
@Override
public String chooseServerAlias(String s, Principal[] principals,
Socket socket) {
return keyManagerRef.get().chooseServerAlias(s, principals, socket);
}
@Override
public X509Certificate[] getCertificateChain(String s) {
return keyManagerRef.get().getCertificateChain(s);
}
@Override
public PrivateKey getPrivateKey(String s) {
return keyManagerRef.get().getPrivateKey(s);
}
public ReloadingX509KeystoreManager loadFrom(Path path) {
try {
this.keyManagerRef.set(loadKeyManager(path));
} catch (Exception ex) {
// The Consumer.accept interface forces us to convert to unchecked
throw new RuntimeException(ex);
}
return this;
}
private X509ExtendedKeyManager loadKeyManager(Path path)
throws IOException, GeneralSecurityException {
X509ExtendedKeyManager keyManager = null;
KeyStore keystore = KeyStore.getInstance(type);
try (InputStream is = Files.newInputStream(path)) {
keystore.load(is, this.storePassword.toCharArray());
}
LOG.debug(" Loaded KeyStore: " + path.toFile().getAbsolutePath());
KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance(
SSLFactory.SSLCERTIFICATE);
keyMgrFactory.init(keystore,
(keyPassword != null) ? keyPassword.toCharArray() : null);
for (KeyManager candidate: keyMgrFactory.getKeyManagers()) {
if (candidate instanceof X509ExtendedKeyManager) {
keyManager = (X509ExtendedKeyManager)candidate;
break;
}
}
return keyManager;
}
}

View File

@ -32,6 +32,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.CertificateException;
@ -39,31 +41,23 @@
import java.util.concurrent.atomic.AtomicReference;
/**
* A {@link TrustManager} implementation that reloads its configuration when
* the truststore file on disk changes.
* A {@link TrustManager} implementation that exposes a method, {@link #loadFrom(Path)}
* to reload its configuration for example when the truststore file on disk changes.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ReloadingX509TrustManager
implements X509TrustManager, Runnable {
public final class ReloadingX509TrustManager implements X509TrustManager {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(ReloadingX509TrustManager.class);
@VisibleForTesting
static final String RELOAD_ERROR_MESSAGE =
"Could not load truststore (keep using existing one) : ";
private String type;
private File file;
private String password;
private long lastLoaded;
private long reloadInterval;
private AtomicReference<X509TrustManager> trustManagerRef;
private volatile boolean running;
private Thread reloader;
/**
* Creates a reloadable trustmanager. The trustmanager reloads itself
* if the underlying trustore file has changed.
@ -71,49 +65,18 @@ public final class ReloadingX509TrustManager
* @param type type of truststore file, typically 'jks'.
* @param location local path to the truststore file.
* @param password password of the truststore file.
* @param reloadInterval interval to check if the truststore file has
* changed, in milliseconds.
* @throws IOException thrown if the truststore could not be initialized due
* to an IO error.
* @throws GeneralSecurityException thrown if the truststore could not be
* initialized due to a security error.
*/
public ReloadingX509TrustManager(String type, String location,
String password, long reloadInterval)
public ReloadingX509TrustManager(String type, String location, String password)
throws IOException, GeneralSecurityException {
this.type = type;
file = new File(location);
this.password = password;
trustManagerRef = new AtomicReference<X509TrustManager>();
trustManagerRef.set(loadTrustManager());
this.reloadInterval = reloadInterval;
}
/**
* Starts the reloader thread.
*/
public void init() {
reloader = new Thread(this, "Truststore reloader thread");
reloader.setDaemon(true);
running = true;
reloader.start();
}
/**
* Stops the reloader thread.
*/
public void destroy() {
running = false;
reloader.interrupt();
}
/**
* Returns the reload check interval.
*
* @return the reload check interval, in milliseconds.
*/
public long getReloadInterval() {
return reloadInterval;
trustManagerRef.set(loadTrustManager(Paths.get(location)));
}
@Override
@ -151,27 +114,24 @@ public X509Certificate[] getAcceptedIssuers() {
return issuers;
}
boolean needsReload() {
boolean reload = true;
if (file.exists()) {
if (file.lastModified() == lastLoaded) {
reload = false;
}
} else {
lastLoaded = 0;
public ReloadingX509TrustManager loadFrom(Path path) {
try {
this.trustManagerRef.set(loadTrustManager(path));
} catch (Exception ex) {
// The Consumer.accept interface forces us to convert to unchecked
throw new RuntimeException(RELOAD_ERROR_MESSAGE, ex);
}
return reload;
return this;
}
X509TrustManager loadTrustManager()
X509TrustManager loadTrustManager(Path path)
throws IOException, GeneralSecurityException {
X509TrustManager trustManager = null;
KeyStore ks = KeyStore.getInstance(type);
InputStream in = Files.newInputStream(file.toPath());
InputStream in = Files.newInputStream(path);
try {
ks.load(in, (password == null) ? null : password.toCharArray());
lastLoaded = file.lastModified();
LOG.debug("Loaded truststore '" + file + "'");
LOG.debug("Loaded truststore '" + path + "'");
} finally {
in.close();
}
@ -188,23 +148,4 @@ X509TrustManager loadTrustManager()
}
return trustManager;
}
@Override
public void run() {
while (running) {
try {
Thread.sleep(reloadInterval);
} catch (InterruptedException e) {
//NOP
}
if (running && needsReload()) {
try {
trustManagerRef.set(loadTrustManager());
} catch (Exception ex) {
LOG.warn(RELOAD_ERROR_MESSAGE + ex.toString(), ex);
}
}
}
}
}

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.thirdparty.com.google.common.base.Supplier;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.Timer;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class TestReloadingX509KeyManager {
private static final String BASEDIR = GenericTestUtils.getTempPath(
TestReloadingX509TrustManager.class.getSimpleName());
private final GenericTestUtils.LogCapturer reloaderLog = GenericTestUtils.LogCapturer.captureLogs(
FileMonitoringTimerTask.LOG);
@BeforeClass
public static void setUp() throws Exception {
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
}
@Test(expected = IOException.class)
public void testLoadMissingKeyStore() throws Exception {
String keystoreLocation = BASEDIR + "/testmissing.jks";
ReloadingX509KeystoreManager tm =
new ReloadingX509KeystoreManager("jks", keystoreLocation,
"password",
"password");
}
@Test(expected = IOException.class)
public void testLoadCorruptKeyStore() throws Exception {
String keystoreLocation = BASEDIR + "/testcorrupt.jks";
OutputStream os = new FileOutputStream(keystoreLocation);
os.write(1);
os.close();
ReloadingX509KeystoreManager tm =
new ReloadingX509KeystoreManager("jks", keystoreLocation,
"password",
"password");
}
@Test (timeout = 3000000)
public void testReload() throws Exception {
KeyPair kp = generateKeyPair("RSA");
X509Certificate sCert = generateCertificate("CN=localhost, O=server", kp, 30,
"SHA1withRSA");
String keystoreLocation = BASEDIR + "/testreload.jks";
createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), sCert);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
ReloadingX509KeystoreManager tm =
new ReloadingX509KeystoreManager("jks", keystoreLocation,
"password",
"password");
try {
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
// Wait so that the file modification time is different
Thread.sleep((reloadInterval+ 1000));
// Change the certificate with a new keypair
final KeyPair anotherKP = generateKeyPair("RSA");
sCert = KeyStoreTestUtil.generateCertificate("CN=localhost, O=server", anotherKP, 30,
"SHA1withRSA");
createKeyStore(keystoreLocation, "password", "cert1", anotherKP.getPrivate(), sCert);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return tm.getPrivateKey("cert1").equals(kp.getPrivate());
}
}, (int) reloadInterval, 100000);
} finally {
fileMonitoringTimer.cancel();
}
}
@Test (timeout = 30000)
public void testReloadMissingTrustStore() throws Exception {
KeyPair kp = generateKeyPair("RSA");
X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
String keystoreLocation = BASEDIR + "/testmissing.jks";
createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
ReloadingX509KeystoreManager tm =
new ReloadingX509KeystoreManager("jks", keystoreLocation,
"password",
"password");
try {
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
assertFalse(reloaderLog.getOutput().contains(
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
// Wait for the first reload to happen so we actually detect a change after the delete
Thread.sleep((reloadInterval+ 1000));
new File(keystoreLocation).delete();
// Wait for the reload to happen and log to get written to
Thread.sleep((reloadInterval+ 1000));
waitForFailedReloadAtLeastOnce((int) reloadInterval);
assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
} finally {
reloaderLog.stopCapturing();
fileMonitoringTimer.cancel();
}
}
@Test (timeout = 30000)
public void testReloadCorruptTrustStore() throws Exception {
KeyPair kp = generateKeyPair("RSA");
X509Certificate cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
String keystoreLocation = BASEDIR + "/testmissing.jks";
createKeyStore(keystoreLocation, "password", "cert1", kp.getPrivate(), cert1);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
ReloadingX509KeystoreManager tm =
new ReloadingX509KeystoreManager("jks", keystoreLocation,
"password",
"password");
try {
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(keystoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
// Wait so that the file modification time is different
Thread.sleep((reloadInterval + 1000));
assertFalse(reloaderLog.getOutput().contains(
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
OutputStream os = new FileOutputStream(keystoreLocation);
os.write(1);
os.close();
waitForFailedReloadAtLeastOnce((int) reloadInterval);
assertEquals(kp.getPrivate(), tm.getPrivateKey("cert1"));
} finally {
reloaderLog.stopCapturing();
fileMonitoringTimer.cancel();
}
}
/**Wait for the reloader thread to load the configurations at least once
* by probing the log of the thread if the reload fails.
*/
private void waitForFailedReloadAtLeastOnce(int reloadInterval)
throws InterruptedException, TimeoutException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return reloaderLog.getOutput().contains(
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
}
}, reloadInterval, 10 * 1000);
}
}

View File

@ -30,10 +30,12 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
@ -50,7 +52,7 @@ public class TestReloadingX509TrustManager {
private X509Certificate cert1;
private X509Certificate cert2;
private final LogCapturer reloaderLog = LogCapturer.captureLogs(
ReloadingX509TrustManager.LOG);
FileMonitoringTimerTask.LOG);
@BeforeClass
public static void setUp() throws Exception {
@ -64,12 +66,7 @@ public void testLoadMissingTrustStore() throws Exception {
String truststoreLocation = BASEDIR + "/testmissing.jks";
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
} finally {
tm.destroy();
}
new ReloadingX509TrustManager("jks", truststoreLocation, "password");
}
@Test(expected = IOException.class)
@ -80,12 +77,7 @@ public void testLoadCorruptTrustStore() throws Exception {
os.close();
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
} finally {
tm.destroy();
}
new ReloadingX509TrustManager("jks", truststoreLocation, "password");
}
@Test (timeout = 30000)
@ -96,14 +88,17 @@ public void testReload() throws Exception {
String truststoreLocation = BASEDIR + "/testreload.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
final ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
new ReloadingX509TrustManager("jks", truststoreLocation, "password");
try {
tm.init();
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(1, tm.getAcceptedIssuers().length);
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 1000));
Thread.sleep((reloadInterval+ 1000));
// Add another cert
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
@ -116,9 +111,9 @@ public void testReload() throws Exception {
public Boolean get() {
return tm.getAcceptedIssuers().length == 2;
}
}, (int) tm.getReloadInterval(), 10000);
}, (int) reloadInterval, 100000);
} finally {
tm.destroy();
fileMonitoringTimer.cancel();
}
}
@ -130,27 +125,38 @@ public void testReloadMissingTrustStore() throws Exception {
String truststoreLocation = BASEDIR + "/testmissing.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
new ReloadingX509TrustManager("jks", truststoreLocation, "password");
try {
tm.init();
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(1, tm.getAcceptedIssuers().length);
X509Certificate cert = tm.getAcceptedIssuers()[0];
assertFalse(reloaderLog.getOutput().contains(
ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
// Wait for the first reload to happen so we actually detect a change after the delete
Thread.sleep((reloadInterval+ 1000));
new File(truststoreLocation).delete();
waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
// Wait for the reload to happen and log to get written to
Thread.sleep((reloadInterval+ 1000));
waitForFailedReloadAtLeastOnce((int) reloadInterval);
assertEquals(1, tm.getAcceptedIssuers().length);
assertEquals(cert, tm.getAcceptedIssuers()[0]);
} finally {
reloaderLog.stopCapturing();
tm.destroy();
fileMonitoringTimer.cancel();
}
}
@Test (timeout = 30000)
public void testReloadCorruptTrustStore() throws Exception {
KeyPair kp = generateKeyPair("RSA");
@ -159,29 +165,32 @@ public void testReloadCorruptTrustStore() throws Exception {
String truststoreLocation = BASEDIR + "/testcorrupt.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
long reloadInterval = 10;
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
new ReloadingX509TrustManager("jks", truststoreLocation, "password");
try {
tm.init();
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(truststoreLocation), tm::loadFrom,null), reloadInterval, reloadInterval);
assertEquals(1, tm.getAcceptedIssuers().length);
final X509Certificate cert = tm.getAcceptedIssuers()[0];
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 1000));
Thread.sleep((reloadInterval + 1000));
assertFalse(reloaderLog.getOutput().contains(
ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE));
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE));
OutputStream os = new FileOutputStream(truststoreLocation);
os.write(1);
os.close();
waitForFailedReloadAtLeastOnce((int) tm.getReloadInterval());
waitForFailedReloadAtLeastOnce((int) reloadInterval);
assertEquals(1, tm.getAcceptedIssuers().length);
assertEquals(cert, tm.getAcceptedIssuers()[0]);
} finally {
reloaderLog.stopCapturing();
tm.destroy();
fileMonitoringTimer.cancel();
}
}
@ -194,7 +203,7 @@ private void waitForFailedReloadAtLeastOnce(int reloadInterval)
@Override
public Boolean get() {
return reloaderLog.getOutput().contains(
ReloadingX509TrustManager.RELOAD_ERROR_MESSAGE);
FileMonitoringTimerTask.PROCESS_ERROR_MESSAGE);
}
}, reloadInterval, 10 * 1000);
}
@ -208,13 +217,15 @@ public void testNoPassword() throws Exception {
String truststoreLocation = BASEDIR + "/testreload.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
Timer fileMonitoringTimer = new Timer(FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME, true);
final ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, null, 10);
new ReloadingX509TrustManager("jks", truststoreLocation, null);
try {
tm.init();
fileMonitoringTimer.schedule(new FileMonitoringTimerTask(
Paths.get(truststoreLocation), tm::loadFrom,null), 10, 10);
assertEquals(1, tm.getAcceptedIssuers().length);
} finally {
tm.destroy();
fileMonitoringTimer.cancel();
}
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.test.GenericTestUtils;
@ -99,7 +100,7 @@ public void testSSLFactoryCleanup() throws Exception {
Thread reloaderThread = null;
for (Thread thread : threads) {
if ((thread.getName() != null)
&& (thread.getName().contains("Truststore reloader thread"))) {
&& (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
reloaderThread = thread;
}
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import static org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory.SSL_MONITORING_THREAD_NAME;
import org.apache.hadoop.test.TestGenericTestUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@ -476,7 +477,7 @@ public void testTimelineClientCleanup() throws Exception {
Thread reloaderThread = null;
for (Thread thread : threads) {
if ((thread.getName() != null)
&& (thread.getName().contains("Truststore reloader thread"))) {
&& (thread.getName().contains(SSL_MONITORING_THREAD_NAME))) {
reloaderThread = thread;
}
}

View File

@ -11,7 +11,8 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>