MAPREDUCE-4417. add support for encrypted shuffle (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1365979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-07-26 13:23:05 +00:00
parent 8f395c2f78
commit 9d16c9354b
22 changed files with 2688 additions and 32 deletions

View File

@ -1,6 +1,21 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
@ -21,7 +36,15 @@
<property>
<name>ssl.client.truststore.type</name>
<value>jks</value>
<description>Optional. Default value is "jks".
<description>Optional. The keystore file format, default value is "jks".
</description>
</property>
<property>
<name>ssl.client.truststore.reload.interval</name>
<value>10000</value>
<description>Truststore reload check interval, in milliseconds.
Default value is 10000 (10 seconds).
</description>
</property>
@ -50,7 +73,7 @@
<property>
<name>ssl.client.keystore.type</name>
<value>jks</value>
<description>Optional. Default value is "jks".
<description>Optional. The keystore file format, default value is "jks".
</description>
</property>

View File

@ -1,6 +1,21 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<property>
@ -20,10 +35,17 @@
<property>
<name>ssl.server.truststore.type</name>
<value>jks</value>
<description>Optional. Default value is "jks".
<description>Optional. The keystore file format, default value is "jks".
</description>
</property>
<property>
<name>ssl.server.truststore.reload.interval</name>
<value>10000</value>
<description>Truststore reload check interval, in milliseconds.
Default value is 10000 (10 seconds).
</property>
<property>
<name>ssl.server.keystore.location</name>
<value></value>
@ -48,7 +70,7 @@
<property>
<name>ssl.server.keystore.type</name>
<value>jks</value>
<description>Optional. Default value is "jks".
<description>Optional. The keystore file format, default value is "jks".
</description>
</property>

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.text.MessageFormat;
/**
* {@link KeyStoresFactory} implementation that reads the certificates from
* keystore files.
* <p/>
* if the trust certificates keystore file changes, the {@link TrustManager}
* is refreshed with the new trust certificate entries (using a
* {@link ReloadingX509TrustManager} trustmanager).
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FileBasedKeyStoresFactory implements KeyStoresFactory {
private static final Log LOG =
LogFactory.getLog(FileBasedKeyStoresFactory.class);
public static final String SSL_KEYSTORE_LOCATION_TPL_KEY =
"ssl.{0}.keystore.location";
public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY =
"ssl.{0}.keystore.password";
public static final String SSL_KEYSTORE_TYPE_TPL_KEY =
"ssl.{0}.keystore.type";
public static final String SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY =
"ssl.{0}.truststore.reload.interval";
public static final String SSL_TRUSTSTORE_LOCATION_TPL_KEY =
"ssl.{0}.truststore.location";
public static final String SSL_TRUSTSTORE_PASSWORD_TPL_KEY =
"ssl.{0}.truststore.password";
public static final String SSL_TRUSTSTORE_TYPE_TPL_KEY =
"ssl.{0}.truststore.type";
/**
* Default format of the keystore files.
*/
public static final String DEFAULT_KEYSTORE_TYPE = "jks";
/**
* Reload interval in milliseconds.
*/
public static final int DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL = 10000;
private Configuration conf;
private KeyManager[] keyManagers;
private TrustManager[] trustManagers;
private ReloadingX509TrustManager trustManager;
/**
* Resolves a property name to its client/server version if applicable.
* <p/>
* NOTE: This method is public for testing purposes.
*
* @param mode client/server mode.
* @param template property name template.
* @return the resolved property name.
*/
@VisibleForTesting
public static String resolvePropertyName(SSLFactory.Mode mode,
String template) {
return MessageFormat.format(template, mode.toString().toLowerCase());
}
/**
* Sets the configuration for the factory.
*
* @param conf the configuration for the factory.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Returns the configuration of the factory.
*
* @return the configuration of the factory.
*/
@Override
public Configuration getConf() {
return conf;
}
/**
* Initializes the keystores of the factory.
*
* @param mode if the keystores are to be used in client or server mode.
* @throws IOException thrown if the keystores could not be initialized due
* to an IO error.
* @throws GeneralSecurityException thrown if the keystores could not be
* initialized due to a security error.
*/
public void init(SSLFactory.Mode mode)
throws IOException, GeneralSecurityException {
boolean requireClientCert =
conf.getBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY, true);
// certificate store
String keystoreType =
conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY),
DEFAULT_KEYSTORE_TYPE);
KeyStore keystore = KeyStore.getInstance(keystoreType);
String keystorePassword = null;
if (requireClientCert || mode == SSLFactory.Mode.SERVER) {
String locationProperty =
resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY);
String keystoreLocation = conf.get(locationProperty, "");
if (keystoreLocation.isEmpty()) {
throw new GeneralSecurityException("The property '" + locationProperty +
"' has not been set in the ssl configuration file.");
}
String passwordProperty =
resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY);
keystorePassword = conf.get(passwordProperty, "");
if (keystorePassword.isEmpty()) {
throw new GeneralSecurityException("The property '" + passwordProperty +
"' has not been set in the ssl configuration file.");
}
LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation);
InputStream is = new FileInputStream(keystoreLocation);
try {
keystore.load(is, keystorePassword.toCharArray());
} finally {
is.close();
}
LOG.info(mode.toString() + " Loaded KeyStore: " + keystoreLocation);
} else {
keystore.load(null, null);
}
KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance("SunX509");
keyMgrFactory.init(keystore, (keystorePassword != null) ?
keystorePassword.toCharArray() : null);
keyManagers = keyMgrFactory.getKeyManagers();
//trust store
String truststoreType =
conf.get(resolvePropertyName(mode, SSL_TRUSTSTORE_TYPE_TPL_KEY),
DEFAULT_KEYSTORE_TYPE);
String locationProperty =
resolvePropertyName(mode, SSL_TRUSTSTORE_LOCATION_TPL_KEY);
String truststoreLocation = conf.get(locationProperty, "");
if (truststoreLocation.isEmpty()) {
throw new GeneralSecurityException("The property '" + locationProperty +
"' has not been set in the ssl configuration file.");
}
String passwordProperty = resolvePropertyName(mode,
SSL_TRUSTSTORE_PASSWORD_TPL_KEY);
String truststorePassword = conf.get(passwordProperty, "");
if (truststorePassword.isEmpty()) {
throw new GeneralSecurityException("The property '" + passwordProperty +
"' has not been set in the ssl configuration file.");
}
long truststoreReloadInterval =
conf.getLong(
resolvePropertyName(mode, SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY),
DEFAULT_SSL_TRUSTSTORE_RELOAD_INTERVAL);
LOG.debug(mode.toString() + " TrustStore: " + truststoreLocation);
trustManager = new ReloadingX509TrustManager(truststoreType,
truststoreLocation,
truststorePassword,
truststoreReloadInterval);
trustManager.init();
LOG.info(mode.toString() + " Loaded TrustStore: " + truststoreLocation);
trustManagers = new TrustManager[]{trustManager};
}
/**
* Releases any resources being used.
*/
@Override
public synchronized void destroy() {
if (trustManager != null) {
trustManager.destroy();
trustManager = null;
keyManagers = null;
trustManagers = null;
}
}
/**
* Returns the keymanagers for owned certificates.
*
* @return the keymanagers for owned certificates.
*/
@Override
public KeyManager[] getKeyManagers() {
return keyManagers;
}
/**
* Returns the trustmanagers for trusted certificates.
*
* @return the trustmanagers for trusted certificates.
*/
@Override
public TrustManager[] getTrustManagers() {
return trustManagers;
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import javax.net.ssl.KeyManager;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.security.GeneralSecurityException;
/**
* Interface that gives access to {@link KeyManager} and {@link TrustManager}
* implementations.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface KeyStoresFactory extends Configurable {
/**
* Initializes the keystores of the factory.
*
* @param mode if the keystores are to be used in client or server mode.
* @throws IOException thrown if the keystores could not be initialized due
* to an IO error.
* @throws GeneralSecurityException thrown if the keystores could not be
* initialized due to an security error.
*/
public void init(SSLFactory.Mode mode) throws IOException, GeneralSecurityException;
/**
* Releases any resources being used.
*/
public void destroy();
/**
* Returns the keymanagers for owned certificates.
*
* @return the keymanagers for owned certificates.
*/
public KeyManager[] getKeyManagers();
/**
* Returns the trustmanagers for trusted certificates.
*
* @return the trustmanagers for trusted certificates.
*/
public TrustManager[] getTrustManagers();
}

View File

@ -0,0 +1,204 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicReference;
/**
* A {@link TrustManager} implementation that reloads its configuration when
* the truststore file on disk changes.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ReloadingX509TrustManager
implements X509TrustManager, Runnable {
private static final Log LOG =
LogFactory.getLog(ReloadingX509TrustManager.class);
private String type;
private File file;
private String password;
private long lastLoaded;
private long reloadInterval;
private AtomicReference<X509TrustManager> trustManagerRef;
private volatile boolean running;
private Thread reloader;
/**
* Creates a reloadable trustmanager. The trustmanager reloads itself
* if the underlying trustore file has changed.
*
* @param type type of truststore file, typically 'jks'.
* @param location local path to the truststore file.
* @param password password of the truststore file.
* @param reloadInterval interval to check if the truststore file has
* changed, in milliseconds.
* @throws IOException thrown if the truststore could not be initialized due
* to an IO error.
* @throws GeneralSecurityException thrown if the truststore could not be
* initialized due to a security error.
*/
public ReloadingX509TrustManager(String type, String location,
String password, long reloadInterval)
throws IOException, GeneralSecurityException {
this.type = type;
file = new File(location);
this.password = password;
trustManagerRef = new AtomicReference<X509TrustManager>();
trustManagerRef.set(loadTrustManager());
this.reloadInterval = reloadInterval;
}
/**
* Starts the reloader thread.
*/
public void init() {
reloader = new Thread(this, "Truststore reloader thread");
reloader.setDaemon(true);
running = true;
reloader.start();
}
/**
* Stops the reloader thread.
*/
public void destroy() {
running = false;
reloader.interrupt();
}
/**
* Returns the reload check interval.
*
* @return the reload check interval, in milliseconds.
*/
public long getReloadInterval() {
return reloadInterval;
}
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
X509TrustManager tm = trustManagerRef.get();
if (tm != null) {
tm.checkClientTrusted(chain, authType);
} else {
throw new CertificateException("Unknown client chain certificate: " +
chain[0].toString());
}
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
X509TrustManager tm = trustManagerRef.get();
if (tm != null) {
tm.checkServerTrusted(chain, authType);
} else {
throw new CertificateException("Unknown server chain certificate: " +
chain[0].toString());
}
}
private static final X509Certificate[] EMPTY = new X509Certificate[0];
@Override
public X509Certificate[] getAcceptedIssuers() {
X509Certificate[] issuers = EMPTY;
X509TrustManager tm = trustManagerRef.get();
if (tm != null) {
issuers = tm.getAcceptedIssuers();
}
return issuers;
}
boolean needsReload() {
boolean reload = true;
if (file.exists()) {
if (file.lastModified() == lastLoaded) {
reload = false;
}
} else {
lastLoaded = 0;
}
return reload;
}
X509TrustManager loadTrustManager()
throws IOException, GeneralSecurityException {
X509TrustManager trustManager = null;
KeyStore ks = KeyStore.getInstance(type);
lastLoaded = file.lastModified();
FileInputStream in = new FileInputStream(file);
try {
ks.load(in, password.toCharArray());
LOG.debug("Loaded truststore '" + file + "'");
} finally {
in.close();
}
TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance("SunX509");
trustManagerFactory.init(ks);
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
for (TrustManager trustManager1 : trustManagers) {
if (trustManager1 instanceof X509TrustManager) {
trustManager = (X509TrustManager) trustManager1;
break;
}
}
return trustManager;
}
@Override
public void run() {
while (running) {
try {
Thread.sleep(reloadInterval);
} catch (InterruptedException e) {
//NOP
}
if (running && needsReload()) {
try {
trustManagerRef.set(loadTrustManager());
} catch (Exception ex) {
LOG.warn("Could not load truststore (keep using existing one) : " +
ex.toString(), ex);
}
}
}
}
}

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLServerSocketFactory;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.security.GeneralSecurityException;
/**
* Factory that creates SSLEngine and SSLSocketFactory instances using
* Hadoop configuration information.
* <p/>
* This SSLFactory uses a {@link ReloadingX509TrustManager} instance,
* which reloads public keys if the truststore file changes.
* <p/>
* This factory is used to configure HTTPS in Hadoop HTTP based endpoints, both
* client and server.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class SSLFactory {
@InterfaceAudience.Private
public static enum Mode { CLIENT, SERVER }
public static final String SSL_REQUIRE_CLIENT_CERT_KEY =
"hadoop.ssl.require.client.cert";
public static final String SSL_HOSTNAME_VERIFIER_KEY =
"hadoop.ssl.hostname.verifier";
public static final String SSL_CLIENT_CONF_KEY =
"hadoop.ssl.client.conf";
public static final String SSL_SERVER_CONF_KEY =
"hadoop.ssl.server.conf";
public static final boolean DEFAULT_SSL_REQUIRE_CLIENT_CERT = false;
public static final String KEYSTORES_FACTORY_CLASS_KEY =
"hadoop.ssl.keystores.factory.class";
private Configuration conf;
private Mode mode;
private boolean requireClientCert;
private SSLContext context;
private HostnameVerifier hostnameVerifier;
private KeyStoresFactory keystoresFactory;
/**
* Creates an SSLFactory.
*
* @param mode SSLFactory mode, client or server.
* @param conf Hadoop configuration from where the SSLFactory configuration
* will be read.
*/
public SSLFactory(Mode mode, Configuration conf) {
this.conf = conf;
if (mode == null) {
throw new IllegalArgumentException("mode cannot be NULL");
}
this.mode = mode;
requireClientCert = conf.getBoolean(SSL_REQUIRE_CLIENT_CERT_KEY,
DEFAULT_SSL_REQUIRE_CLIENT_CERT);
Configuration sslConf = readSSLConfiguration(mode);
Class<? extends KeyStoresFactory> klass
= conf.getClass(KEYSTORES_FACTORY_CLASS_KEY,
FileBasedKeyStoresFactory.class, KeyStoresFactory.class);
keystoresFactory = ReflectionUtils.newInstance(klass, sslConf);
}
private Configuration readSSLConfiguration(Mode mode) {
Configuration sslConf = new Configuration(false);
sslConf.setBoolean(SSL_REQUIRE_CLIENT_CERT_KEY, requireClientCert);
String sslConfResource;
if (mode == Mode.CLIENT) {
sslConfResource = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
} else {
sslConfResource = conf.get(SSL_SERVER_CONF_KEY, "ssl-server.xml");
}
sslConf.addResource(sslConfResource);
return sslConf;
}
/**
* Initializes the factory.
*
* @throws GeneralSecurityException thrown if an SSL initialization error
* happened.
* @throws IOException thrown if an IO error happened while reading the SSL
* configuration.
*/
public void init() throws GeneralSecurityException, IOException {
keystoresFactory.init(mode);
context = SSLContext.getInstance("TLS");
context.init(keystoresFactory.getKeyManagers(),
keystoresFactory.getTrustManagers(), null);
hostnameVerifier = getHostnameVerifier(conf);
}
private HostnameVerifier getHostnameVerifier(Configuration conf)
throws GeneralSecurityException, IOException {
HostnameVerifier hostnameVerifier;
String verifier =
conf.get(SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT").trim().toUpperCase();
if (verifier.equals("DEFAULT")) {
hostnameVerifier = SSLHostnameVerifier.DEFAULT;
} else if (verifier.equals("DEFAULT_AND_LOCALHOST")) {
hostnameVerifier = SSLHostnameVerifier.DEFAULT_AND_LOCALHOST;
} else if (verifier.equals("STRICT")) {
hostnameVerifier = SSLHostnameVerifier.STRICT;
} else if (verifier.equals("STRICT_IE6")) {
hostnameVerifier = SSLHostnameVerifier.STRICT_IE6;
} else if (verifier.equals("ALLOW_ALL")) {
hostnameVerifier = SSLHostnameVerifier.ALLOW_ALL;
} else {
throw new GeneralSecurityException("Invalid hostname verifier: " +
verifier);
}
return hostnameVerifier;
}
/**
* Releases any resources being used.
*/
public void destroy() {
keystoresFactory.destroy();
}
/**
* Returns the SSLFactory KeyStoresFactory instance.
*
* @return the SSLFactory KeyStoresFactory instance.
*/
public KeyStoresFactory getKeystoresFactory() {
return keystoresFactory;
}
/**
* Returns a configured SSLEngine.
*
* @return the configured SSLEngine.
* @throws GeneralSecurityException thrown if the SSL engine could not
* be initialized.
* @throws IOException thrown if and IO error occurred while loading
* the server keystore.
*/
public SSLEngine createSSLEngine()
throws GeneralSecurityException, IOException {
SSLEngine sslEngine = context.createSSLEngine();
if (mode == Mode.CLIENT) {
sslEngine.setUseClientMode(true);
} else {
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(requireClientCert);
}
return sslEngine;
}
/**
* Returns a configured SSLServerSocketFactory.
*
* @return the configured SSLSocketFactory.
* @throws GeneralSecurityException thrown if the SSLSocketFactory could not
* be initialized.
* @throws IOException thrown if and IO error occurred while loading
* the server keystore.
*/
public SSLServerSocketFactory createSSLServerSocketFactory()
throws GeneralSecurityException, IOException {
if (mode != Mode.SERVER) {
throw new IllegalStateException("Factory is in CLIENT mode");
}
return context.getServerSocketFactory();
}
/**
* Returns a configured SSLSocketFactory.
*
* @return the configured SSLSocketFactory.
* @throws GeneralSecurityException thrown if the SSLSocketFactory could not
* be initialized.
* @throws IOException thrown if and IO error occurred while loading
* the server keystore.
*/
public SSLSocketFactory createSSLSocketFactory()
throws GeneralSecurityException, IOException {
if (mode != Mode.CLIENT) {
throw new IllegalStateException("Factory is in CLIENT mode");
}
return context.getSocketFactory();
}
/**
* Returns the hostname verifier it should be used in HttpsURLConnections.
*
* @return the hostname verifier.
*/
public HostnameVerifier getHostnameVerifier() {
if (mode != Mode.CLIENT) {
throw new IllegalStateException("Factory is in CLIENT mode");
}
return hostnameVerifier;
}
/**
* Returns if client certificates are required or not.
*
* @return if client certificates are required or not.
*/
public boolean isClientCertRequired() {
return requireClientCert;
}
}

View File

@ -0,0 +1,585 @@
/*
* $HeadURL$
* $Revision$
* $Date$
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.io.InputStream;
import java.security.cert.Certificate;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.StringTokenizer;
import java.util.TreeSet;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
/**
************************************************************************
* Copied from the not-yet-commons-ssl project at
* http://juliusdavies.ca/commons-ssl/
* This project is not yet in Apache, but it is Apache 2.0 licensed.
************************************************************************
* Interface for checking if a hostname matches the names stored inside the
* server's X.509 certificate. Correctly implements
* javax.net.ssl.HostnameVerifier, but that interface is not recommended.
* Instead we added several check() methods that take SSLSocket,
* or X509Certificate, or ultimately (they all end up calling this one),
* String. (It's easier to supply JUnit with Strings instead of mock
* SSLSession objects!)
* </p><p>Our check() methods throw exceptions if the name is
* invalid, whereas javax.net.ssl.HostnameVerifier just returns true/false.
* <p/>
* We provide the HostnameVerifier.DEFAULT, HostnameVerifier.STRICT, and
* HostnameVerifier.ALLOW_ALL implementations. We also provide the more
* specialized HostnameVerifier.DEFAULT_AND_LOCALHOST, as well as
* HostnameVerifier.STRICT_IE6. But feel free to define your own
* implementations!
* <p/>
* Inspired by Sebastian Hauer's original StrictSSLProtocolSocketFactory in the
* HttpClient "contrib" repository.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface SSLHostnameVerifier extends javax.net.ssl.HostnameVerifier {
boolean verify(String host, SSLSession session);
void check(String host, SSLSocket ssl) throws IOException;
void check(String host, X509Certificate cert) throws SSLException;
void check(String host, String[] cns, String[] subjectAlts)
throws SSLException;
void check(String[] hosts, SSLSocket ssl) throws IOException;
void check(String[] hosts, X509Certificate cert) throws SSLException;
/**
* Checks to see if the supplied hostname matches any of the supplied CNs
* or "DNS" Subject-Alts. Most implementations only look at the first CN,
* and ignore any additional CNs. Most implementations do look at all of
* the "DNS" Subject-Alts. The CNs or Subject-Alts may contain wildcards
* according to RFC 2818.
*
* @param cns CN fields, in order, as extracted from the X.509
* certificate.
* @param subjectAlts Subject-Alt fields of type 2 ("DNS"), as extracted
* from the X.509 certificate.
* @param hosts The array of hostnames to verify.
* @throws SSLException If verification failed.
*/
void check(String[] hosts, String[] cns, String[] subjectAlts)
throws SSLException;
/**
* The DEFAULT HostnameVerifier works the same way as Curl and Firefox.
* <p/>
* The hostname must match either the first CN, or any of the subject-alts.
* A wildcard can occur in the CN, and in any of the subject-alts.
* <p/>
* The only difference between DEFAULT and STRICT is that a wildcard (such
* as "*.foo.com") with DEFAULT matches all subdomains, including
* "a.b.foo.com".
*/
public final static SSLHostnameVerifier DEFAULT =
new AbstractVerifier() {
public final void check(final String[] hosts, final String[] cns,
final String[] subjectAlts)
throws SSLException {
check(hosts, cns, subjectAlts, false, false);
}
public final String toString() { return "DEFAULT"; }
};
/**
* The DEFAULT_AND_LOCALHOST HostnameVerifier works like the DEFAULT
* one with one additional relaxation: a host of "localhost",
* "localhost.localdomain", "127.0.0.1", "::1" will always pass, no matter
* what is in the server's certificate.
*/
public final static SSLHostnameVerifier DEFAULT_AND_LOCALHOST =
new AbstractVerifier() {
public final void check(final String[] hosts, final String[] cns,
final String[] subjectAlts)
throws SSLException {
if (isLocalhost(hosts[0])) {
return;
}
check(hosts, cns, subjectAlts, false, false);
}
public final String toString() { return "DEFAULT_AND_LOCALHOST"; }
};
/**
* The STRICT HostnameVerifier works the same way as java.net.URL in Sun
* Java 1.4, Sun Java 5, Sun Java 6. It's also pretty close to IE6.
* This implementation appears to be compliant with RFC 2818 for dealing
* with wildcards.
* <p/>
* The hostname must match either the first CN, or any of the subject-alts.
* A wildcard can occur in the CN, and in any of the subject-alts. The
* one divergence from IE6 is how we only check the first CN. IE6 allows
* a match against any of the CNs present. We decided to follow in
* Sun Java 1.4's footsteps and only check the first CN.
* <p/>
* A wildcard such as "*.foo.com" matches only subdomains in the same
* level, for example "a.foo.com". It does not match deeper subdomains
* such as "a.b.foo.com".
*/
public final static SSLHostnameVerifier STRICT =
new AbstractVerifier() {
public final void check(final String[] host, final String[] cns,
final String[] subjectAlts)
throws SSLException {
check(host, cns, subjectAlts, false, true);
}
public final String toString() { return "STRICT"; }
};
/**
* The STRICT_IE6 HostnameVerifier works just like the STRICT one with one
* minor variation: the hostname can match against any of the CN's in the
* server's certificate, not just the first one. This behaviour is
* identical to IE6's behaviour.
*/
public final static SSLHostnameVerifier STRICT_IE6 =
new AbstractVerifier() {
public final void check(final String[] host, final String[] cns,
final String[] subjectAlts)
throws SSLException {
check(host, cns, subjectAlts, true, true);
}
public final String toString() { return "STRICT_IE6"; }
};
/**
* The ALLOW_ALL HostnameVerifier essentially turns hostname verification
* off. This implementation is a no-op, and never throws the SSLException.
*/
public final static SSLHostnameVerifier ALLOW_ALL =
new AbstractVerifier() {
public final void check(final String[] host, final String[] cns,
final String[] subjectAlts) {
// Allow everything - so never blowup.
}
public final String toString() { return "ALLOW_ALL"; }
};
@SuppressWarnings("unchecked")
abstract class AbstractVerifier implements SSLHostnameVerifier {
/**
* This contains a list of 2nd-level domains that aren't allowed to
* have wildcards when combined with country-codes.
* For example: [*.co.uk].
* <p/>
* The [*.co.uk] problem is an interesting one. Should we just hope
* that CA's would never foolishly allow such a certificate to happen?
* Looks like we're the only implementation guarding against this.
* Firefox, Curl, Sun Java 1.4, 5, 6 don't bother with this check.
*/
private final static String[] BAD_COUNTRY_2LDS =
{"ac", "co", "com", "ed", "edu", "go", "gouv", "gov", "info",
"lg", "ne", "net", "or", "org"};
private final static String[] LOCALHOSTS = {"::1", "127.0.0.1",
"localhost",
"localhost.localdomain"};
static {
// Just in case developer forgot to manually sort the array. :-)
Arrays.sort(BAD_COUNTRY_2LDS);
Arrays.sort(LOCALHOSTS);
}
protected AbstractVerifier() {}
/**
* The javax.net.ssl.HostnameVerifier contract.
*
* @param host 'hostname' we used to create our socket
* @param session SSLSession with the remote server
* @return true if the host matched the one in the certificate.
*/
public boolean verify(String host, SSLSession session) {
try {
Certificate[] certs = session.getPeerCertificates();
X509Certificate x509 = (X509Certificate) certs[0];
check(new String[]{host}, x509);
return true;
}
catch (SSLException e) {
return false;
}
}
public void check(String host, SSLSocket ssl) throws IOException {
check(new String[]{host}, ssl);
}
public void check(String host, X509Certificate cert)
throws SSLException {
check(new String[]{host}, cert);
}
public void check(String host, String[] cns, String[] subjectAlts)
throws SSLException {
check(new String[]{host}, cns, subjectAlts);
}
public void check(String host[], SSLSocket ssl)
throws IOException {
if (host == null) {
throw new NullPointerException("host to verify is null");
}
SSLSession session = ssl.getSession();
if (session == null) {
// In our experience this only happens under IBM 1.4.x when
// spurious (unrelated) certificates show up in the server'
// chain. Hopefully this will unearth the real problem:
InputStream in = ssl.getInputStream();
in.available();
/*
If you're looking at the 2 lines of code above because
you're running into a problem, you probably have two
options:
#1. Clean up the certificate chain that your server
is presenting (e.g. edit "/etc/apache2/server.crt"
or wherever it is your server's certificate chain
is defined).
OR
#2. Upgrade to an IBM 1.5.x or greater JVM, or switch
to a non-IBM JVM.
*/
// If ssl.getInputStream().available() didn't cause an
// exception, maybe at least now the session is available?
session = ssl.getSession();
if (session == null) {
// If it's still null, probably a startHandshake() will
// unearth the real problem.
ssl.startHandshake();
// Okay, if we still haven't managed to cause an exception,
// might as well go for the NPE. Or maybe we're okay now?
session = ssl.getSession();
}
}
Certificate[] certs;
try {
certs = session.getPeerCertificates();
} catch (SSLPeerUnverifiedException spue) {
InputStream in = ssl.getInputStream();
in.available();
// Didn't trigger anything interesting? Okay, just throw
// original.
throw spue;
}
X509Certificate x509 = (X509Certificate) certs[0];
check(host, x509);
}
public void check(String[] host, X509Certificate cert)
throws SSLException {
String[] cns = Certificates.getCNs(cert);
String[] subjectAlts = Certificates.getDNSSubjectAlts(cert);
check(host, cns, subjectAlts);
}
public void check(final String[] hosts, final String[] cns,
final String[] subjectAlts, final boolean ie6,
final boolean strictWithSubDomains)
throws SSLException {
// Build up lists of allowed hosts For logging/debugging purposes.
StringBuffer buf = new StringBuffer(32);
buf.append('<');
for (int i = 0; i < hosts.length; i++) {
String h = hosts[i];
h = h != null ? h.trim().toLowerCase() : "";
hosts[i] = h;
if (i > 0) {
buf.append('/');
}
buf.append(h);
}
buf.append('>');
String hostnames = buf.toString();
// Build the list of names we're going to check. Our DEFAULT and
// STRICT implementations of the HostnameVerifier only use the
// first CN provided. All other CNs are ignored.
// (Firefox, wget, curl, Sun Java 1.4, 5, 6 all work this way).
TreeSet names = new TreeSet();
if (cns != null && cns.length > 0 && cns[0] != null) {
names.add(cns[0]);
if (ie6) {
for (int i = 1; i < cns.length; i++) {
names.add(cns[i]);
}
}
}
if (subjectAlts != null) {
for (int i = 0; i < subjectAlts.length; i++) {
if (subjectAlts[i] != null) {
names.add(subjectAlts[i]);
}
}
}
if (names.isEmpty()) {
String msg = "Certificate for " + hosts[0] + " doesn't contain CN or DNS subjectAlt";
throw new SSLException(msg);
}
// StringBuffer for building the error message.
buf = new StringBuffer();
boolean match = false;
out:
for (Iterator it = names.iterator(); it.hasNext();) {
// Don't trim the CN, though!
String cn = (String) it.next();
cn = cn.toLowerCase();
// Store CN in StringBuffer in case we need to report an error.
buf.append(" <");
buf.append(cn);
buf.append('>');
if (it.hasNext()) {
buf.append(" OR");
}
// The CN better have at least two dots if it wants wildcard
// action. It also can't be [*.co.uk] or [*.co.jp] or
// [*.org.uk], etc...
boolean doWildcard = cn.startsWith("*.") &&
cn.lastIndexOf('.') >= 0 &&
!isIP4Address(cn) &&
acceptableCountryWildcard(cn);
for (int i = 0; i < hosts.length; i++) {
final String hostName = hosts[i].trim().toLowerCase();
if (doWildcard) {
match = hostName.endsWith(cn.substring(1));
if (match && strictWithSubDomains) {
// If we're in strict mode, then [*.foo.com] is not
// allowed to match [a.b.foo.com]
match = countDots(hostName) == countDots(cn);
}
} else {
match = hostName.equals(cn);
}
if (match) {
break out;
}
}
}
if (!match) {
throw new SSLException("hostname in certificate didn't match: " + hostnames + " !=" + buf);
}
}
public static boolean isIP4Address(final String cn) {
boolean isIP4 = true;
String tld = cn;
int x = cn.lastIndexOf('.');
// We only bother analyzing the characters after the final dot
// in the name.
if (x >= 0 && x + 1 < cn.length()) {
tld = cn.substring(x + 1);
}
for (int i = 0; i < tld.length(); i++) {
if (!Character.isDigit(tld.charAt(0))) {
isIP4 = false;
break;
}
}
return isIP4;
}
public static boolean acceptableCountryWildcard(final String cn) {
int cnLen = cn.length();
if (cnLen >= 7 && cnLen <= 9) {
// Look for the '.' in the 3rd-last position:
if (cn.charAt(cnLen - 3) == '.') {
// Trim off the [*.] and the [.XX].
String s = cn.substring(2, cnLen - 3);
// And test against the sorted array of bad 2lds:
int x = Arrays.binarySearch(BAD_COUNTRY_2LDS, s);
return x < 0;
}
}
return true;
}
public static boolean isLocalhost(String host) {
host = host != null ? host.trim().toLowerCase() : "";
if (host.startsWith("::1")) {
int x = host.lastIndexOf('%');
if (x >= 0) {
host = host.substring(0, x);
}
}
int x = Arrays.binarySearch(LOCALHOSTS, host);
return x >= 0;
}
/**
* Counts the number of dots "." in a string.
*
* @param s string to count dots from
* @return number of dots
*/
public static int countDots(final String s) {
int count = 0;
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i) == '.') {
count++;
}
}
return count;
}
}
@SuppressWarnings("unchecked")
static class Certificates {
public static String[] getCNs(X509Certificate cert) {
LinkedList cnList = new LinkedList();
/*
Sebastian Hauer's original StrictSSLProtocolSocketFactory used
getName() and had the following comment:
Parses a X.500 distinguished name for the value of the
"Common Name" field. This is done a bit sloppy right
now and should probably be done a bit more according to
<code>RFC 2253</code>.
I've noticed that toString() seems to do a better job than
getName() on these X500Principal objects, so I'm hoping that
addresses Sebastian's concern.
For example, getName() gives me this:
1.2.840.113549.1.9.1=#16166a756c6975736461766965734063756362632e636f6d
whereas toString() gives me this:
EMAILADDRESS=juliusdavies@cucbc.com
Looks like toString() even works with non-ascii domain names!
I tested it with "&#x82b1;&#x5b50;.co.jp" and it worked fine.
*/
String subjectPrincipal = cert.getSubjectX500Principal().toString();
StringTokenizer st = new StringTokenizer(subjectPrincipal, ",");
while (st.hasMoreTokens()) {
String tok = st.nextToken();
int x = tok.indexOf("CN=");
if (x >= 0) {
cnList.add(tok.substring(x + 3));
}
}
if (!cnList.isEmpty()) {
String[] cns = new String[cnList.size()];
cnList.toArray(cns);
return cns;
} else {
return null;
}
}
/**
* Extracts the array of SubjectAlt DNS names from an X509Certificate.
* Returns null if there aren't any.
* <p/>
* Note: Java doesn't appear able to extract international characters
* from the SubjectAlts. It can only extract international characters
* from the CN field.
* <p/>
* (Or maybe the version of OpenSSL I'm using to test isn't storing the
* international characters correctly in the SubjectAlts?).
*
* @param cert X509Certificate
* @return Array of SubjectALT DNS names stored in the certificate.
*/
public static String[] getDNSSubjectAlts(X509Certificate cert) {
LinkedList subjectAltList = new LinkedList();
Collection c = null;
try {
c = cert.getSubjectAlternativeNames();
}
catch (CertificateParsingException cpe) {
// Should probably log.debug() this?
cpe.printStackTrace();
}
if (c != null) {
Iterator it = c.iterator();
while (it.hasNext()) {
List list = (List) it.next();
int type = ((Integer) list.get(0)).intValue();
// If type is 2, then we've got a dNSName
if (type == 2) {
String s = (String) list.get(1);
subjectAltList.add(s);
}
}
}
if (!subjectAltList.isEmpty()) {
String[] subjectAlts = new String[subjectAltList.size()];
subjectAltList.toArray(subjectAlts);
return subjectAlts;
} else {
return null;
}
}
}
}

View File

@ -1026,4 +1026,51 @@
<name>hadoop.http.staticuser.user</name>
<value>dr.who</value>
</property>
<!-- SSLFactory configuration -->
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<description>
The keystores factory to use for retrieving certificates.
</description>
</property>
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<description>Whether client certificates are required</description>
</property>
<property>
<name>hadoop.ssl.hostname.verifier</name>
<value>DEFAULT</value>
<description>
The hostname verifier to provide for HttpsURLConnections.
Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
ALLOW_ALL
</description>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<description>
Resource file from which ssl server keystore information will be extracted.
This file is looked up in the classpath, typically it should be in Hadoop
conf/ directory.
</description>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<description>
Resource file from which ssl client keystore information will be extracted
This file is looked up in the classpath, typically it should be in Hadoop
conf/ directory.
</description>
</property>
</configuration>

View File

@ -0,0 +1,270 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.conf.Configuration;
import sun.security.x509.AlgorithmId;
import sun.security.x509.CertificateAlgorithmId;
import sun.security.x509.CertificateIssuerName;
import sun.security.x509.CertificateSerialNumber;
import sun.security.x509.CertificateSubjectName;
import sun.security.x509.CertificateValidity;
import sun.security.x509.CertificateVersion;
import sun.security.x509.CertificateX509Key;
import sun.security.x509.X500Name;
import sun.security.x509.X509CertImpl;
import sun.security.x509.X509CertInfo;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.math.BigInteger;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.Key;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class KeyStoreTestUtil {
public static String getClasspathDir(Class klass) throws Exception {
String file = klass.getName();
file = file.replace('.', '/') + ".class";
URL url = Thread.currentThread().getContextClassLoader().getResource(file);
String baseDir = url.toURI().getPath();
baseDir = baseDir.substring(0, baseDir.length() - file.length() - 1);
return baseDir;
}
/**
* Create a self-signed X.509 Certificate.
* From http://bfo.com/blog/2011/03/08/odds_and_ends_creating_a_new_x_509_certificate.html.
*
* @param dn the X.509 Distinguished Name, eg "CN=Test, L=London, C=GB"
* @param pair the KeyPair
* @param days how many days from now the Certificate is valid for
* @param algorithm the signing algorithm, eg "SHA1withRSA"
* @return the self-signed certificate
* @throws IOException thrown if an IO error ocurred.
* @throws GeneralSecurityException thrown if an Security error ocurred.
*/
public static X509Certificate generateCertificate(String dn, KeyPair pair,
int days, String algorithm)
throws GeneralSecurityException, IOException {
PrivateKey privkey = pair.getPrivate();
X509CertInfo info = new X509CertInfo();
Date from = new Date();
Date to = new Date(from.getTime() + days * 86400000l);
CertificateValidity interval = new CertificateValidity(from, to);
BigInteger sn = new BigInteger(64, new SecureRandom());
X500Name owner = new X500Name(dn);
info.set(X509CertInfo.VALIDITY, interval);
info.set(X509CertInfo.SERIAL_NUMBER, new CertificateSerialNumber(sn));
info.set(X509CertInfo.SUBJECT, new CertificateSubjectName(owner));
info.set(X509CertInfo.ISSUER, new CertificateIssuerName(owner));
info.set(X509CertInfo.KEY, new CertificateX509Key(pair.getPublic()));
info
.set(X509CertInfo.VERSION, new CertificateVersion(CertificateVersion.V3));
AlgorithmId algo = new AlgorithmId(AlgorithmId.md5WithRSAEncryption_oid);
info.set(X509CertInfo.ALGORITHM_ID, new CertificateAlgorithmId(algo));
// Sign the cert to identify the algorithm that's used.
X509CertImpl cert = new X509CertImpl(info);
cert.sign(privkey, algorithm);
// Update the algorith, and resign.
algo = (AlgorithmId) cert.get(X509CertImpl.SIG_ALG);
info
.set(CertificateAlgorithmId.NAME + "." + CertificateAlgorithmId.ALGORITHM,
algo);
cert = new X509CertImpl(info);
cert.sign(privkey, algorithm);
return cert;
}
public static KeyPair generateKeyPair(String algorithm)
throws NoSuchAlgorithmException {
KeyPairGenerator keyGen = KeyPairGenerator.getInstance(algorithm);
keyGen.initialize(1024);
return keyGen.genKeyPair();
}
private static KeyStore createEmptyKeyStore()
throws GeneralSecurityException, IOException {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(null, null); // initialize
return ks;
}
private static void saveKeyStore(KeyStore ks, String filename,
String password)
throws GeneralSecurityException, IOException {
FileOutputStream out = new FileOutputStream(filename);
try {
ks.store(out, password.toCharArray());
} finally {
out.close();
}
}
public static void createKeyStore(String filename,
String password, String alias,
Key privateKey, Certificate cert)
throws GeneralSecurityException, IOException {
KeyStore ks = createEmptyKeyStore();
ks.setKeyEntry(alias, privateKey, password.toCharArray(),
new Certificate[]{cert});
saveKeyStore(ks, filename, password);
}
public static void createTrustStore(String filename,
String password, String alias,
Certificate cert)
throws GeneralSecurityException, IOException {
KeyStore ks = createEmptyKeyStore();
ks.setCertificateEntry(alias, cert);
saveKeyStore(ks, filename, password);
}
public static <T extends Certificate> void createTrustStore(
String filename, String password, Map<String, T> certs)
throws GeneralSecurityException, IOException {
KeyStore ks = createEmptyKeyStore();
for (Map.Entry<String, T> cert : certs.entrySet()) {
ks.setCertificateEntry(cert.getKey(), cert.getValue());
}
saveKeyStore(ks, filename, password);
}
public static void cleanupSSLConfig(String keystoresDir, String sslConfDir)
throws Exception {
File f = new File(keystoresDir + "/clientKS.jks");
f.delete();
f = new File(keystoresDir + "/serverKS.jks");
f.delete();
f = new File(keystoresDir + "/trustKS.jks");
f.delete();
f = new File(sslConfDir + "/ssl-client.xml");
f.delete();
f = new File(sslConfDir + "/ssl-server.xml");
f.delete();
}
public static void setupSSLConfig(String keystoresDir, String sslConfDir,
Configuration conf, boolean useClientCert)
throws Exception {
String clientKS = keystoresDir + "/clientKS.jks";
String clientPassword = "clientP";
String serverKS = keystoresDir + "/serverKS.jks";
String serverPassword = "serverP";
String trustKS = keystoresDir + "/trustKS.jks";
String trustPassword = "trustP";
File sslClientConfFile = new File(sslConfDir + "/ssl-client.xml");
File sslServerConfFile = new File(sslConfDir + "/ssl-server.xml");
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
if (useClientCert) {
KeyPair cKP = KeyStoreTestUtil.generateKeyPair("RSA");
X509Certificate cCert =
KeyStoreTestUtil.generateCertificate("CN=localhost, O=client", cKP, 30,
"SHA1withRSA");
KeyStoreTestUtil.createKeyStore(clientKS, clientPassword, "client",
cKP.getPrivate(), cCert);
certs.put("client", cCert);
}
KeyPair sKP = KeyStoreTestUtil.generateKeyPair("RSA");
X509Certificate sCert =
KeyStoreTestUtil.generateCertificate("CN=localhost, O=server", sKP, 30,
"SHA1withRSA");
KeyStoreTestUtil.createKeyStore(serverKS, serverPassword, "server",
sKP.getPrivate(), sCert);
certs.put("server", sCert);
KeyStoreTestUtil.createTrustStore(trustKS, trustPassword, certs);
Configuration clientSSLConf = new Configuration(false);
clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.CLIENT,
FileBasedKeyStoresFactory.SSL_KEYSTORE_LOCATION_TPL_KEY), clientKS);
clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.CLIENT,
FileBasedKeyStoresFactory.SSL_KEYSTORE_PASSWORD_TPL_KEY), clientPassword);
clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.CLIENT,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_LOCATION_TPL_KEY), trustKS);
clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.CLIENT,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_PASSWORD_TPL_KEY), trustPassword);
clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.CLIENT,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY), "1000");
Configuration serverSSLConf = new Configuration(false);
serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.SERVER,
FileBasedKeyStoresFactory.SSL_KEYSTORE_LOCATION_TPL_KEY), serverKS);
serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.SERVER,
FileBasedKeyStoresFactory.SSL_KEYSTORE_PASSWORD_TPL_KEY), serverPassword);
serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.SERVER,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_LOCATION_TPL_KEY), trustKS);
serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.SERVER,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_PASSWORD_TPL_KEY), trustPassword);
serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName(
SSLFactory.Mode.SERVER,
FileBasedKeyStoresFactory.SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY), "1000");
Writer writer = new FileWriter(sslClientConfFile);
try {
clientSSLConf.writeXml(writer);
} finally {
writer.close();
}
writer = new FileWriter(sslServerConfFile);
try {
serverSSLConf.writeXml(writer);
} finally {
writer.close();
}
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "ALLOW_ALL");
conf.set(SSLFactory.SSL_CLIENT_CONF_KEY, sslClientConfFile.getName());
conf.set(SSLFactory.SSL_SERVER_CONF_KEY, sslServerConfFile.getName());
conf.setBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY, useClientCert);
}
}

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.fs.FileUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.KeyPair;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.createTrustStore;
import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.generateCertificate;
import static org.apache.hadoop.security.ssl.KeyStoreTestUtil.generateKeyPair;
public class TestReloadingX509TrustManager {
private static final String BASEDIR =
System.getProperty("test.build.data", "target/test-dir") + "/" +
TestReloadingX509TrustManager.class.getSimpleName();
private X509Certificate cert1;
private X509Certificate cert2;
@BeforeClass
public static void setUp() throws Exception {
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
}
@Test(expected = IOException.class)
public void testLoadMissingTrustStore() throws Exception {
String truststoreLocation = BASEDIR + "/testmissing.jks";
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
} finally {
tm.destroy();
}
}
@Test(expected = IOException.class)
public void testLoadCorruptTrustStore() throws Exception {
String truststoreLocation = BASEDIR + "/testcorrupt.jks";
OutputStream os = new FileOutputStream(truststoreLocation);
os.write(1);
os.close();
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
} finally {
tm.destroy();
}
}
@Test
public void testReload() throws Exception {
KeyPair kp = generateKeyPair("RSA");
cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
String truststoreLocation = BASEDIR + "/testreload.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
assertEquals(1, tm.getAcceptedIssuers().length);
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 1000));
// Add another cert
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
certs.put("cert1", cert1);
certs.put("cert2", cert2);
createTrustStore(truststoreLocation, "password", certs);
// and wait to be sure reload has taken place
assertEquals(10, tm.getReloadInterval());
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 200));
assertEquals(2, tm.getAcceptedIssuers().length);
} finally {
tm.destroy();
}
}
@Test
public void testReloadMissingTrustStore() throws Exception {
KeyPair kp = generateKeyPair("RSA");
cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
String truststoreLocation = BASEDIR + "/testmissing.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
assertEquals(1, tm.getAcceptedIssuers().length);
X509Certificate cert = tm.getAcceptedIssuers()[0];
new File(truststoreLocation).delete();
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 200));
assertEquals(1, tm.getAcceptedIssuers().length);
assertEquals(cert, tm.getAcceptedIssuers()[0]);
} finally {
tm.destroy();
}
}
@Test
public void testReloadCorruptTrustStore() throws Exception {
KeyPair kp = generateKeyPair("RSA");
cert1 = generateCertificate("CN=Cert1", kp, 30, "SHA1withRSA");
cert2 = generateCertificate("CN=Cert2", kp, 30, "SHA1withRSA");
String truststoreLocation = BASEDIR + "/testcorrupt.jks";
createTrustStore(truststoreLocation, "password", "cert1", cert1);
ReloadingX509TrustManager tm =
new ReloadingX509TrustManager("jks", truststoreLocation, "password", 10);
try {
tm.init();
assertEquals(1, tm.getAcceptedIssuers().length);
X509Certificate cert = tm.getAcceptedIssuers()[0];
OutputStream os = new FileOutputStream(truststoreLocation);
os.write(1);
os.close();
new File(truststoreLocation).setLastModified(System.currentTimeMillis() -
1000);
// Wait so that the file modification time is different
Thread.sleep((tm.getReloadInterval() + 200));
assertEquals(1, tm.getAcceptedIssuers().length);
assertEquals(cert, tm.getAcceptedIssuers()[0]);
} finally {
tm.destroy();
}
}
}

View File

@ -0,0 +1,164 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.ssl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.net.URL;
import java.security.GeneralSecurityException;
public class TestSSLFactory {
private static final String BASEDIR =
System.getProperty("test.build.dir", "target/test-dir") + "/" +
TestSSLFactory.class.getSimpleName();
@BeforeClass
public static void setUp() throws Exception {
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
}
private Configuration createConfiguration(boolean clientCert)
throws Exception {
Configuration conf = new Configuration();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSSLFactory.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf, clientCert);
return conf;
}
@After
@Before
public void cleanUp() throws Exception {
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSSLFactory.class);
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfsDir);
}
@Test(expected = IllegalStateException.class)
public void clientMode() throws Exception {
Configuration conf = createConfiguration(false);
SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
sslFactory.init();
Assert.assertNotNull(sslFactory.createSSLSocketFactory());
Assert.assertNotNull(sslFactory.getHostnameVerifier());
sslFactory.createSSLServerSocketFactory();
} finally {
sslFactory.destroy();
}
}
private void serverMode(boolean clientCert, boolean socket) throws Exception {
Configuration conf = createConfiguration(clientCert);
SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
try {
sslFactory.init();
Assert.assertNotNull(sslFactory.createSSLServerSocketFactory());
Assert.assertEquals(clientCert, sslFactory.isClientCertRequired());
if (socket) {
sslFactory.createSSLSocketFactory();
} else {
sslFactory.getHostnameVerifier();
}
} finally {
sslFactory.destroy();
}
}
@Test(expected = IllegalStateException.class)
public void serverModeWithoutClientCertsSocket() throws Exception {
serverMode(false, true);
}
@Test(expected = IllegalStateException.class)
public void serverModeWithClientCertsSocket() throws Exception {
serverMode(true, true);
}
@Test(expected = IllegalStateException.class)
public void serverModeWithoutClientCertsVerifier() throws Exception {
serverMode(false, false);
}
@Test(expected = IllegalStateException.class)
public void serverModeWithClientCertsVerifier() throws Exception {
serverMode(true, false);
}
@Test
public void validHostnameVerifier() throws Exception {
Configuration conf = createConfiguration(false);
conf.unset(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY);
SSLFactory sslFactory = new
SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
Assert.assertEquals("DEFAULT", sslFactory.getHostnameVerifier().toString());
sslFactory.destroy();
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "ALLOW_ALL");
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
Assert.assertEquals("ALLOW_ALL",
sslFactory.getHostnameVerifier().toString());
sslFactory.destroy();
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT_AND_LOCALHOST");
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
Assert.assertEquals("DEFAULT_AND_LOCALHOST",
sslFactory.getHostnameVerifier().toString());
sslFactory.destroy();
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "STRICT");
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
Assert.assertEquals("STRICT", sslFactory.getHostnameVerifier().toString());
sslFactory.destroy();
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "STRICT_IE6");
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslFactory.init();
Assert.assertEquals("STRICT_IE6",
sslFactory.getHostnameVerifier().toString());
sslFactory.destroy();
}
@Test(expected = GeneralSecurityException.class)
public void invalidHostnameVerifier() throws Exception {
Configuration conf = createConfiguration(false);
conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "foo");
SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
sslFactory.init();
} finally {
sslFactory.destroy();
}
}
}

View File

@ -135,6 +135,8 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-987. Exposing MiniDFS and MiniMR clusters as a single process
command-line. (ahmed via tucu)
MAPREDUCE-4417. add support for encrypted shuffle (tucu)
IMPROVEMENTS
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved

View File

@ -473,5 +473,10 @@
<!--
The above 2 fields are accessed locally and only via methods that are synchronized.
-->
<Match>
<Class name="org.apache.hadoop.mapred.ShuffleHandler" />
<Field name="sslFileBufferSize" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TypeConverter;
@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@ -108,7 +110,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private long scheduledTime;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected boolean encryptedShuffle;
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
@ -274,6 +277,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
this.jobToken = jobToken;
this.metrics = metrics;
this.appContext = appContext;
this.encryptedShuffle = conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
// See if this is from a previous generation.
if (completedTasksFromPreviousRun != null
@ -637,9 +642,10 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(-1);
tce.setMapOutputServerAddress("http://"
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort());
String scheme = (encryptedShuffle) ? "https://" : "http://";
tce.setMapOutputServerAddress(scheme
+ attempt.getNodeHttpAddress().split(":")[0] + ":"
+ attempt.getShufflePort());
tce.setStatus(status);
tce.setAttemptId(attempt.getID());
int runTime = 0;

View File

@ -79,4 +79,9 @@ public interface MRConfig {
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
public static final String MAX_BLOCK_LOCATIONS_KEY =
"mapreduce.job.max.split.locations";
public static final String SHUFFLE_SSL_ENABLED_KEY =
"mapreduce.shuffle.ssl.enabled";
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
}

View File

@ -25,11 +25,13 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.net.HttpURLConnection;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -42,9 +44,11 @@ import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@ -92,6 +96,9 @@ class Fetcher<K,V> extends Thread {
private volatile boolean stopped = false;
private static boolean sslShuffle;
private static SSLFactory sslFactory;
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
@ -135,6 +142,20 @@ class Fetcher<K,V> extends Thread {
setName("fetcher#" + id);
setDaemon(true);
synchronized (Fetcher.class) {
sslShuffle = job.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT);
if (sslShuffle && sslFactory == null) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, job);
try {
sslFactory.init();
} catch (Exception ex) {
sslFactory.destroy();
throw new RuntimeException(ex);
}
}
}
}
public void run() {
@ -173,8 +194,25 @@ class Fetcher<K,V> extends Thread {
} catch (InterruptedException ie) {
LOG.warn("Got interrupt while joining " + getName(), ie);
}
if (sslFactory != null) {
sslFactory.destroy();
}
}
protected HttpURLConnection openConnection(URL url) throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (sslShuffle) {
HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
try {
httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
} catch (GeneralSecurityException ex) {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
}
return conn;
}
/**
* The crux of the matter...
*
@ -205,7 +243,7 @@ class Fetcher<K,V> extends Thread {
try {
URL url = getMapOutputURL(host, maps);
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
HttpURLConnection connection = openConnection(url);
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);

View File

@ -512,6 +512,21 @@
single shuffle can consume</description>
</property>
<property>
<name>mapreduce.shuffle.ssl.enabled</name>
<value>false</value>
<description>
Whether to use SSL for for the Shuffle HTTP endpoints.
</description>
</property>
<property>
<name>mapreduce.shuffle.ssl.file.buffer.size</name>
<value>65536</value>
<description>Buffer size for reading spills from file when using SSL.
</description>
</property>
<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.security.ssl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Assert;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URL;
public class TestEncryptedShuffle {
private static final String BASEDIR =
System.getProperty("test.build.dir", "target/test-dir") + "/" +
TestEncryptedShuffle.class.getSimpleName();
@BeforeClass
public static void setUp() throws Exception {
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
}
@Before
public void createCustomYarnClasspath() throws Exception {
String classpathDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
URL url = Thread.currentThread().getContextClassLoader().
getResource("mrapp-generated-classpath");
File f = new File(url.getPath());
BufferedReader reader = new BufferedReader(new FileReader(f));
String cp = reader.readLine();
cp = cp + ":" + classpathDir;
f = new File(classpathDir, "mrapp-generated-classpath");
Writer writer = new FileWriter(f);
writer.write(cp);
writer.close();
new File(classpathDir, "core-site.xml").delete();
}
@After
public void cleanUpMiniClusterSpecialConfig() throws Exception {
String classpathDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
new File(classpathDir, "mrapp-generated-classpath").delete();
new File(classpathDir, "core-site.xml").delete();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, classpathDir);
}
private MiniDFSCluster dfsCluster = null;
private MiniMRClientCluster mrCluster = null;
private void startCluster(Configuration conf) throws Exception {
if (System.getProperty("hadoop.log.dir") == null) {
System.setProperty("hadoop.log.dir", "target/test-dir");
}
conf.set("dfs.block.access.token.enable", "false");
conf.set("dfs.permissions", "true");
conf.set("hadoop.security.authentication", "simple");
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSystem = dfsCluster.getFileSystem();
fileSystem.mkdirs(new Path("/tmp"));
fileSystem.mkdirs(new Path("/user"));
fileSystem.mkdirs(new Path("/hadoop/mapred/system"));
fileSystem.setPermission(
new Path("/tmp"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(
new Path("/user"), FsPermission.valueOf("-rwxrwxrwx"));
fileSystem.setPermission(
new Path("/hadoop/mapred/system"), FsPermission.valueOf("-rwx------"));
FileSystem.setDefaultUri(conf, fileSystem.getUri());
mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
// so the minicluster conf is avail to the containers.
String classpathDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
Writer writer = new FileWriter(classpathDir + "/core-site.xml");
mrCluster.getConfig().writeXml(writer);
writer.close();
}
private void stopCluster() throws Exception {
if (mrCluster != null) {
mrCluster.stop();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
protected JobConf getJobConf() throws IOException {
return new JobConf(mrCluster.getConfig());
}
private void encryptedShuffleWithCerts(boolean useClientCerts)
throws Exception {
try {
Configuration conf = new Configuration();
String keystoresDir = new File(BASEDIR).getAbsolutePath();
String sslConfsDir =
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf,
useClientCerts);
conf.setBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, true);
startCluster(conf);
FileSystem fs = FileSystem.get(getJobConf());
Path inputDir = new Path("input");
fs.mkdirs(inputDir);
Writer writer =
new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
writer.write("hello");
writer.close();
Path outputDir = new Path("output", "output");
JobConf jobConf = new JobConf(getJobConf());
jobConf.setInt("mapred.map.tasks", 1);
jobConf.setInt("mapred.map.max.attempts", 1);
jobConf.setInt("mapred.reduce.max.attempts", 1);
jobConf.set("mapred.input.dir", inputDir.toString());
jobConf.set("mapred.output.dir", outputDir.toString());
JobClient jobClient = new JobClient(jobConf);
RunningJob runJob = jobClient.submitJob(jobConf);
runJob.waitForCompletion();
Assert.assertTrue(runJob.isComplete());
Assert.assertTrue(runJob.isSuccessful());
} finally {
stopCluster();
}
}
@Test
public void encryptedShuffleWithClientCerts() throws Exception {
encryptedShuffleWithCerts(true);
}
@Test
public void encryptedShuffleWithoutClientCerts() throws Exception {
encryptedShuffleWithCerts(false);
}
}

View File

@ -55,7 +55,9 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@ -101,6 +103,8 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedFile;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
@ -114,6 +118,8 @@ public class ShuffleHandler extends AbstractService
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
private HttpPipelineFactory pipelineFact;
private int sslFileBufferSize;
public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce.shuffle";
@ -126,6 +132,11 @@ public class ShuffleHandler extends AbstractService
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 8080;
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"mapreduce.shuffle.ssl.file.buffer.size";
public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@Metric("Shuffle output in bytes")
@ -249,7 +260,11 @@ public class ShuffleHandler extends AbstractService
public synchronized void start() {
Configuration conf = getConfig();
ServerBootstrap bootstrap = new ServerBootstrap(selector);
HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
try {
pipelineFact = new HttpPipelineFactory(conf);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
bootstrap.setPipelineFactory(pipelineFact);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
Channel ch = bootstrap.bind(new InetSocketAddress(port));
@ -259,6 +274,9 @@ public class ShuffleHandler extends AbstractService
pipelineFact.SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
super.start();
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
}
@Override
@ -266,6 +284,7 @@ public class ShuffleHandler extends AbstractService
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
bootstrap.releaseExternalResources();
pipelineFact.destroy();
super.stop();
}
@ -283,22 +302,38 @@ public class ShuffleHandler extends AbstractService
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
private SSLFactory sslFactory;
public HttpPipelineFactory(Configuration conf) {
public HttpPipelineFactory(Configuration conf) throws Exception {
SHUFFLE = new Shuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
}
public void destroy() {
if (sslFactory != null) {
sslFactory.destroy();
}
}
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new HttpRequestDecoder(),
new HttpChunkAggregator(1 << 16),
new HttpResponseEncoder(),
new ChunkedWriteHandler(),
SHUFFLE);
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
// TODO factor out decode of index to permit alt. models
ChannelPipeline pipeline = Channels.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", SHUFFLE);
return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
// TODO factor out decode of index to permit alt. models
}
}
@ -483,17 +518,25 @@ public class ShuffleHandler extends AbstractService
LOG.info(spillfile + " not found");
return null;
}
final FileRegion partition = new DefaultFileRegion(
spill.getChannel(), info.startOffset, info.partLength);
ChannelFuture writeFuture = ch.write(partition);
writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures,
// attribute to appropriate spill output
@Override
public void operationComplete(ChannelFuture future) {
partition.releaseExternalResources();
}
});
ChannelFuture writeFuture;
if (ch.getPipeline().get(SslHandler.class) == null) {
final FileRegion partition = new DefaultFileRegion(
spill.getChannel(), info.startOffset, info.partLength);
writeFuture = ch.write(partition);
writeFuture.addListener(new ChannelFutureListener() {
// TODO error handling; distinguish IO/connection failures,
// attribute to appropriate spill output
@Override
public void operationComplete(ChannelFuture future) {
partition.releaseExternalResources();
}
});
} else {
// HTTPS cannot be done with zero copy.
writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
info.partLength,
sslFileBufferSize));
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
return writeFuture;

View File

@ -0,0 +1,320 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
Hadoop Map Reduce Next Generation-${project.version} - Encrypted Shuffle
---
---
${maven.build.timestamp}
Hadoop MapReduce Next Generation - Encrypted Shuffle
\[ {{{./index.html}Go Back}} \]
* {Introduction}
The Encrypted Shuffle capability allows encryption of the MapReduce shuffle
using HTTPS and with optional client authentication (also known as
bi-directional HTTPS, or HTTPS with client certificates). It comprises:
* A Hadoop configuration setting for toggling the shuffle between HTTP and
HTTPS.
* A Hadoop configuration settings for specifying the keystore and truststore
properties (location, type, passwords) used by the shuffle service and the
reducers tasks fetching shuffle data.
* A way to re-load truststores across the cluster (when a node is added or
removed).
* {Configuration}
** <<core-site.xml>> Properties
To enable encrypted shuffle, set the following properties in core-site.xml of
all nodes in the cluster:
*--------------------------------------+---------------------+-----------------+
| <<Property>> | <<Default Value>> | <<Explanation>> |
*--------------------------------------+---------------------+-----------------+
| <<<hadoop.ssl.require.client.cert>>> | <<<false>>> | Whether client certificates are required |
*--------------------------------------+---------------------+-----------------+
| <<<hadoop.ssl.hostname.verifier>>> | <<<DEFAULT>>> | The hostname verifier to provide for HttpsURLConnections. Valid values are: <<DEFAULT>>, <<STRICT>>, <<STRICT_I6>>, <<DEFAULT_AND_LOCALHOST>> and <<ALLOW_ALL>> |
*--------------------------------------+---------------------+-----------------+
| <<<hadoop.ssl.keystores.factory.class>>> | <<<org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory>>> | The KeyStoresFactory implementation to use |
*--------------------------------------+---------------------+-----------------+
| <<<hadoop.ssl.server.conf>>> | <<<ss-server.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
*--------------------------------------+---------------------+-----------------+
| <<<hadoop.ssl.client.conf>>> | <<<ss-client.xml>>> | Resource file from which ssl server keystore information will be extracted. This file is looked up in the classpath, typically it should be in Hadoop conf/ directory |
*--------------------------------------+---------------------+-----------------+
<<IMPORTANT:>> Currently requiring client certificates should be set to false.
Refer the {{{ClientCertificates}Client Certificates}} section for details.
<<IMPORTANT:>> All these properties should be marked as final in the cluster
configuration files.
*** Example:
------
...
<property>
<name>hadoop.ssl.require.client.cert</name>
<value>false</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.hostname.verifier</name>
<value>DEFAULT</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.keystores.factory.class</name>
<value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.server.conf</name>
<value>ssl-server.xml</value>
<final>true</final>
</property>
<property>
<name>hadoop.ssl.client.conf</name>
<value>ssl-client.xml</value>
<final>true</final>
</property>
...
------
** <<<mapred-site.xml>>> Properties
To enable encrypted shuffle, set the following property in mapred-site.xml
of all nodes in the cluster:
*--------------------------------------+---------------------+-----------------+
| <<Property>> | <<Default Value>> | <<Explanation>> |
*--------------------------------------+---------------------+-----------------+
| <<<mapreduce.shuffle.ssl.enabled>>> | <<<false>>> | Whether encrypted shuffle is enabled |
*--------------------------------------+---------------------+-----------------+
<<IMPORTANT:>> This property should be marked as final in the cluster
configuration files.
*** Example:
------
...
<property>
<name>mapreduce.shuffle.ssl.enabled</name>
<value>true</value>
<final>true</final>
</property>
...
------
The Linux container executor should be set to prevent job tasks from
reading the server keystore information and gaining access to the shuffle
server certificates.
Refer to Hadoop Kerberos configuration for details on how to do this.
* {Keystore and Truststore Settings}
Currently <<<FileBasedKeyStoresFactory>>> is the only <<<KeyStoresFactory>>>
implementation. The <<<FileBasedKeyStoresFactory>>> implementation uses the
following properties, in the <<ssl-server.xml>> and <<ssl-client.xml>> files,
to configure the keystores and truststores.
** <<<ssl-server.xml>>> (Shuffle server) Configuration:
The mapred user should own the <<ssl-server.xml>> file and have exclusive
read access to it.
*---------------------------------------------+---------------------+-----------------+
| <<Property>> | <<Default Value>> | <<Explanation>> |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.keystore.type>>> | <<<jks>>> | Keystore file type |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and have exclusive read access to it. |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.keystore.password>>> | NONE | Keystore file password |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.truststore.type>>> | <<<jks>>> | Truststore file type |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and have exclusive read access to it. |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.truststore.password>>> | NONE | Truststore file password |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.server.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds |
*--------------------------------------+----------------------------+-----------------+
*** Example:
------
<configuration>
<!-- Server Certificate Store -->
<property>
<name>ssl.server.keystore.type</name>
<value>jks</value>
</property>
<property>
<name>ssl.server.keystore.location</name>
<value>${user.home}/keystores/server-keystore.jks</value>
</property>
<property>
<name>ssl.server.keystore.password</name>
<value>serverfoo</value>
</property>
<!-- Server Trust Store -->
<property>
<name>ssl.server.truststore.type</name>
<value>jks</value>
</property>
<property>
<name>ssl.server.truststore.location</name>
<value>${user.home}/keystores/truststore.jks</value>
</property>
<property>
<name>ssl.server.truststore.password</name>
<value>clientserverbar</value>
</property>
<property>
<name>ssl.server.truststore.reload.interval</name>
<value>10000</value>
</property>
</configuration>
------
** <<<ssl-client.xml>>> (Reducer/Fetcher) Configuration:
The mapred user should own the <<ssl-server.xml>> file and it should have
default permissions.
*---------------------------------------------+---------------------+-----------------+
| <<Property>> | <<Default Value>> | <<Explanation>> |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.keystore.type>>> | <<<jks>>> | Keystore file type |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.keystore.location>>> | NONE | Keystore file location. The mapred user should own this file and it should have default permissions. |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.keystore.password>>> | NONE | Keystore file password |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.truststore.type>>> | <<<jks>>> | Truststore file type |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.truststore.location>>> | NONE | Truststore file location. The mapred user should own this file and it should have default permissions. |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.truststore.password>>> | NONE | Truststore file password |
*---------------------------------------------+---------------------+-----------------+
| <<<ssl.client.truststore.reload.interval>>> | 10000 | Truststore reload interval, in milliseconds |
*--------------------------------------+----------------------------+-----------------+
*** Example:
------
<configuration>
<!-- Client certificate Store -->
<property>
<name>ssl.client.keystore.type</name>
<value>jks</value>
</property>
<property>
<name>ssl.client.keystore.location</name>
<value>${user.home}/keystores/client-keystore.jks</value>
</property>
<property>
<name>ssl.client.keystore.password</name>
<value>clientfoo</value>
</property>
<!-- Client Trust Store -->
<property>
<name>ssl.client.truststore.type</name>
<value>jks</value>
</property>
<property>
<name>ssl.client.truststore.location</name>
<value>${user.home}/keystores/truststore.jks</value>
</property>
<property>
<name>ssl.client.truststore.password</name>
<value>clientserverbar</value>
</property>
<property>
<name>ssl.client.truststore.reload.interval</name>
<value>10000</value>
</property>
</configuration>
------
* Activating Encrypted Shuffle
When you have made the above configuration changes, activate Encrypted
Shuffle by re-starting all NodeManagers.
<<IMPORTANT:>> Using encrypted shuffle will incur in a significant
performance impact. Users should profile this and potentially reserve
1 or more cores for encrypted shuffle.
* {ClientCertificates} Client Certificates
Using Client Certificates does not fully ensure that the client is a
reducer task for the job. Currently, Client Certificates (their private key)
keystore files must be readable by all users submitting jobs to the cluster.
This means that a rogue job could read such those keystore files and use
the client certificates in them to establish a secure connection with a
Shuffle server. However, unless the rogue job has a proper JobToken, it won't
be able to retrieve shuffle data from the Shuffle server. A job, using its
own JobToken, can only retrieve shuffle data that belongs to itself.
* Reloading Truststores
By default the truststores will reload their configuration every 10 seconds.
If a new truststore file is copied over the old one, it will be re-read,
and its certificates will replace the old ones. This mechanism is useful for
adding or removing nodes from the cluster, or for adding or removing trusted
clients. In these cases, the client or NodeManager certificate is added to
(or removed from) all the truststore files in the system, and the new
configuration will be picked up without you having to restart the NodeManager
daemons.
* Debugging
<<NOTE:>> Enable debugging only for troubleshooting, and then only for jobs
running on small amounts of data. It is very verbose and slows down jobs by
several orders of magnitude. (You might need to increase mapred.task.timeout
to prevent jobs from failing because tasks run so slowly.)
To enable SSL debugging in the reducers, set <<<-Djavax.net.debug=all>>> in
the <<<mapreduce.reduce.child.java.opts>>> property; for example:
------
<property>
<name>mapred.reduce.child.java.opts</name>
<value>-Xmx-200m -Djavax.net.debug=all</value>
</property>
------
You can do this on a per-job basis, or by means of a cluster-wide setting in
the <<<mapred-site.xml>>> file.
To set this property in NodeManager, set it in the <<<yarn-env.sh>>> file:
------
YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all $YARN_NODEMANAGER_OPTS"
------

View File

@ -51,3 +51,5 @@ MapReduce NextGen aka YARN aka MRv2
* {{{./CLIMiniCluster.html}CLI MiniCluster}}
* {{{./EncryptedShuffle.html}Encrypted Shuffle}}

View File

@ -66,6 +66,7 @@
<item name="Writing Yarn Applications" href="hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"/>
<item name="Capacity Scheduler" href="hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html"/>
<item name="Web Application Proxy" href="hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html"/>
<item name="Encrypted Shuffle" href="hadoop-yarn/hadoop-yarn-site/EncryptedShuffle.html"/>
<item name="Yarn Commands" href="hadoop-yarn/hadoop-yarn-site/YarnCommands.html"/>
</menu>