diff --git a/hadoop-common-project/hadoop-auth/pom.xml b/hadoop-common-project/hadoop-auth/pom.xml index 564518c540..5f7d77434b 100644 --- a/hadoop-common-project/hadoop-auth/pom.xml +++ b/hadoop-common-project/hadoop-auth/pom.xml @@ -130,6 +130,19 @@ + + org.apache.zookeeper + zookeeper + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-test + test + diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java index 9330444c46..47cf54c606 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/server/AuthenticationFilter.java @@ -22,6 +22,7 @@ import org.apache.hadoop.security.authentication.util.RandomSignerSecretProvider; import org.apache.hadoop.security.authentication.util.SignerSecretProvider; import org.apache.hadoop.security.authentication.util.StringSignerSecretProvider; +import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,7 @@ /** * The {@link AuthenticationFilter} enables protecting web application resources with different (pluggable) - * authentication mechanisms. + * authentication mechanisms and signer secret providers. *

* Out of the box it provides 2 authentication mechanisms: Pseudo and Kerberos SPNEGO. *

@@ -60,10 +61,13 @@ *

  • [#PREFIX#.]type: simple|kerberos|#CLASS#, 'simple' is short for the * {@link PseudoAuthenticationHandler}, 'kerberos' is short for {@link KerberosAuthenticationHandler}, otherwise * the full class name of the {@link AuthenticationHandler} must be specified.
  • - *
  • [#PREFIX#.]signature.secret: the secret used to sign the HTTP cookie value. The default value is a random - * value. Unless multiple webapp instances need to share the secret the random value is adequate.
  • - *
  • [#PREFIX#.]token.validity: time -in seconds- that the generated token is valid before a - * new authentication is triggered, default value is 3600 seconds.
  • + *
  • [#PREFIX#.]signature.secret: when signer.secret.provider is set to + * "string" or not specified, this is the value for the secret used to sign the + * HTTP cookie.
  • + *
  • [#PREFIX#.]token.validity: time -in seconds- that the generated token is + * valid before a new authentication is triggered, default value is + * 3600 seconds. This is also used for the rollover interval for + * the "random" and "zookeeper" SignerSecretProviders.
  • *
  • [#PREFIX#.]cookie.domain: domain to use for the HTTP cookie that stores the authentication token.
  • *
  • [#PREFIX#.]cookie.path: path to use for the HTTP cookie that stores the authentication token.
  • * @@ -72,6 +76,49 @@ * {@link AuthenticationFilter} will take all the properties that start with the prefix #PREFIX#, it will remove * the prefix from it and it will pass them to the the authentication handler for initialization. Properties that do * not start with the prefix will not be passed to the authentication handler initialization. + *

    + * Out of the box it provides 3 signer secret provider implementations: + * "string", "random", and "zookeeper" + *

    + * Additional signer secret providers are supported via the + * {@link SignerSecretProvider} class. + *

    + * For the HTTP cookies mentioned above, the SignerSecretProvider is used to + * determine the secret to use for signing the cookies. Different + * implementations can have different behaviors. The "string" implementation + * simply uses the string set in the [#PREFIX#.]signature.secret property + * mentioned above. The "random" implementation uses a randomly generated + * secret that rolls over at the interval specified by the + * [#PREFIX#.]token.validity mentioned above. The "zookeeper" implementation + * is like the "random" one, except that it synchronizes the random secret + * and rollovers between multiple servers; it's meant for HA services. + *

    + * The relevant configuration properties are: + *

    + *

    + * The "zookeeper" implementation has additional configuration properties that + * must be specified; see {@link ZKSignerSecretProvider} for details. + *

    + * For subclasses of AuthenticationFilter that want additional control over the + * SignerSecretProvider, they can use the following attribute set in the + * ServletContext: + *

    */ @InterfaceAudience.Private @@ -112,20 +159,23 @@ public class AuthenticationFilter implements Filter { /** * Constant for the configuration property that indicates the name of the - * SignerSecretProvider class to use. If not specified, SIGNATURE_SECRET - * will be used or a random secret. + * SignerSecretProvider class to use. + * Possible values are: "string", "random", "zookeeper", or a classname. + * If not specified, the "string" implementation will be used with + * SIGNATURE_SECRET; and if that's not specified, the "random" implementation + * will be used. */ - public static final String SIGNER_SECRET_PROVIDER_CLASS = + public static final String SIGNER_SECRET_PROVIDER = "signer.secret.provider"; /** - * Constant for the attribute that can be used for providing a custom - * object that subclasses the SignerSecretProvider. Note that this should be - * set in the ServletContext and the class should already be initialized. - * If not specified, SIGNER_SECRET_PROVIDER_CLASS will be used. + * Constant for the ServletContext attribute that can be used for providing a + * custom implementation of the SignerSecretProvider. Note that the class + * should already be initialized. If not specified, SIGNER_SECRET_PROVIDER + * will be used. */ - public static final String SIGNATURE_PROVIDER_ATTRIBUTE = - "org.apache.hadoop.security.authentication.util.SignerSecretProvider"; + public static final String SIGNER_SECRET_PROVIDER_ATTRIBUTE = + "signer.secret.provider.object"; private Properties config; private Signer signer; @@ -138,7 +188,7 @@ public class AuthenticationFilter implements Filter { private String cookiePath; /** - * Initializes the authentication filter. + * Initializes the authentication filter and signer secret provider. *

    * It instantiates and initializes the specified {@link AuthenticationHandler}. *

    @@ -184,35 +234,19 @@ public void init(FilterConfig filterConfig) throws ServletException { validity = Long.parseLong(config.getProperty(AUTH_TOKEN_VALIDITY, "36000")) * 1000; //10 hours secretProvider = (SignerSecretProvider) filterConfig.getServletContext(). - getAttribute(SIGNATURE_PROVIDER_ATTRIBUTE); + getAttribute(SIGNER_SECRET_PROVIDER_ATTRIBUTE); if (secretProvider == null) { - String signerSecretProviderClassName = - config.getProperty(configPrefix + SIGNER_SECRET_PROVIDER_CLASS, null); - if (signerSecretProviderClassName == null) { - String signatureSecret = - config.getProperty(configPrefix + SIGNATURE_SECRET, null); - if (signatureSecret != null) { - secretProvider = new StringSignerSecretProvider(signatureSecret); - } else { - secretProvider = new RandomSignerSecretProvider(); - randomSecret = true; - } - } else { - try { - Class klass = Thread.currentThread().getContextClassLoader(). - loadClass(signerSecretProviderClassName); - secretProvider = (SignerSecretProvider) klass.newInstance(); - customSecretProvider = true; - } catch (ClassNotFoundException ex) { - throw new ServletException(ex); - } catch (InstantiationException ex) { - throw new ServletException(ex); - } catch (IllegalAccessException ex) { - throw new ServletException(ex); - } + Class providerClass + = getProviderClass(config); + try { + secretProvider = providerClass.newInstance(); + } catch (InstantiationException ex) { + throw new ServletException(ex); + } catch (IllegalAccessException ex) { + throw new ServletException(ex); } try { - secretProvider.init(config, validity); + secretProvider.init(config, filterConfig.getServletContext(), validity); } catch (Exception ex) { throw new ServletException(ex); } @@ -225,6 +259,42 @@ public void init(FilterConfig filterConfig) throws ServletException { cookiePath = config.getProperty(COOKIE_PATH, null); } + @SuppressWarnings("unchecked") + private Class getProviderClass(Properties config) + throws ServletException { + String providerClassName; + String signerSecretProviderName + = config.getProperty(SIGNER_SECRET_PROVIDER, null); + // fallback to old behavior + if (signerSecretProviderName == null) { + String signatureSecret = config.getProperty(SIGNATURE_SECRET, null); + if (signatureSecret != null) { + providerClassName = StringSignerSecretProvider.class.getName(); + } else { + providerClassName = RandomSignerSecretProvider.class.getName(); + randomSecret = true; + } + } else { + if ("random".equals(signerSecretProviderName)) { + providerClassName = RandomSignerSecretProvider.class.getName(); + randomSecret = true; + } else if ("string".equals(signerSecretProviderName)) { + providerClassName = StringSignerSecretProvider.class.getName(); + } else if ("zookeeper".equals(signerSecretProviderName)) { + providerClassName = ZKSignerSecretProvider.class.getName(); + } else { + providerClassName = signerSecretProviderName; + customSecretProvider = true; + } + } + try { + return (Class) Thread.currentThread(). + getContextClassLoader().loadClass(providerClassName); + } catch (ClassNotFoundException ex) { + throw new ServletException(ex); + } + } + /** * Returns the configuration properties of the {@link AuthenticationFilter} * without the prefix. The returned properties are the same that the diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RandomSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RandomSignerSecretProvider.java index 5491a8671b..29e5661cb0 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RandomSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RandomSignerSecretProvider.java @@ -13,12 +13,13 @@ */ package org.apache.hadoop.security.authentication.util; +import com.google.common.annotations.VisibleForTesting; import java.util.Random; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** - * A SignerSecretProvider that uses a random number as it's secret. It rolls + * A SignerSecretProvider that uses a random number as its secret. It rolls * the secret at a regular interval. */ @InterfaceStability.Unstable @@ -37,6 +38,7 @@ public RandomSignerSecretProvider() { * is meant for testing. * @param seed the seed for the random number generator */ + @VisibleForTesting public RandomSignerSecretProvider(long seed) { super(); rand = new Random(seed); diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RolloverSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RolloverSignerSecretProvider.java index ec6e601b4d..bdca3e4eb9 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RolloverSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/RolloverSignerSecretProvider.java @@ -17,6 +17,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.servlet.ServletContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.slf4j.Logger; @@ -57,12 +58,14 @@ public RolloverSignerSecretProvider() { * Initialize the SignerSecretProvider. It initializes the current secret * and starts the scheduler for the rollover to run at an interval of * tokenValidity. - * @param config filter configuration + * @param config configuration properties + * @param servletContext servlet context * @param tokenValidity The amount of time a token is valid for * @throws Exception */ @Override - public void init(Properties config, long tokenValidity) throws Exception { + public void init(Properties config, ServletContext servletContext, + long tokenValidity) throws Exception { initSecrets(generateNewSecret(), null); startScheduler(tokenValidity, tokenValidity); } diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/SignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/SignerSecretProvider.java index a4d98d784f..2e0b985489 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/SignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/SignerSecretProvider.java @@ -14,6 +14,7 @@ package org.apache.hadoop.security.authentication.util; import java.util.Properties; +import javax.servlet.ServletContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -30,13 +31,13 @@ public abstract class SignerSecretProvider { /** * Initialize the SignerSecretProvider - * @param config filter configuration + * @param config configuration properties + * @param servletContext servlet context * @param tokenValidity The amount of time a token is valid for * @throws Exception */ - public abstract void init(Properties config, long tokenValidity) - throws Exception; - + public abstract void init(Properties config, ServletContext servletContext, + long tokenValidity) throws Exception; /** * Will be called on shutdown; subclasses should perform any cleanup here. */ diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/StringSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/StringSignerSecretProvider.java index 230059b645..7aaccd2914 100644 --- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/StringSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/StringSignerSecretProvider.java @@ -14,8 +14,10 @@ package org.apache.hadoop.security.authentication.util; import java.util.Properties; +import javax.servlet.ServletContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; /** * A SignerSecretProvider that simply creates a secret based on a given String. @@ -27,14 +29,15 @@ public class StringSignerSecretProvider extends SignerSecretProvider { private byte[] secret; private byte[][] secrets; - public StringSignerSecretProvider(String secretStr) { - secret = secretStr.getBytes(); - secrets = new byte[][]{secret}; - } + public StringSignerSecretProvider() {} @Override - public void init(Properties config, long tokenValidity) throws Exception { - // do nothing + public void init(Properties config, ServletContext servletContext, + long tokenValidity) throws Exception { + String signatureSecret = config.getProperty( + AuthenticationFilter.SIGNATURE_SECRET, null); + secret = signatureSecret.getBytes(); + secrets = new byte[][]{secret}; } @Override diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java new file mode 100644 index 0000000000..a17b6d495d --- /dev/null +++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java @@ -0,0 +1,506 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package org.apache.hadoop.security.authentication.util; + +import com.google.common.annotations.VisibleForTesting; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.servlet.ServletContext; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.imps.DefaultACLProvider; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Perms; +import org.apache.zookeeper.client.ZooKeeperSaslClient; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A SignerSecretProvider that synchronizes a rolling random secret between + * multiple servers using ZooKeeper. + *

    + * It works by storing the secrets and next rollover time in a ZooKeeper znode. + * All ZKSignerSecretProviders looking at that znode will use those + * secrets and next rollover time to ensure they are synchronized. There is no + * "leader" -- any of the ZKSignerSecretProviders can choose the next secret; + * which one is indeterminate. Kerberos-based ACLs can also be enforced to + * prevent a malicious third-party from getting or setting the secrets. It uses + * its own CuratorFramework client for talking to ZooKeeper. If you want to use + * your own Curator client, you can pass it to ZKSignerSecretProvider; see + * {@link org.apache.hadoop.security.authentication.server.AuthenticationFilter} + * for more details. + *

    + * The supported configuration properties are: + *

    + * + * The following attribute in the ServletContext can also be set if desired: + *
  • signer.secret.provider.zookeeper.curator.client: A CuratorFramework + * client object can be passed here. If given, the "zookeeper" implementation + * will use this Curator client instead of creating its own, which is useful if + * you already have a Curator client or want more control over its + * configuration.
  • + */ +@InterfaceStability.Unstable +@InterfaceAudience.Private +public class ZKSignerSecretProvider extends RolloverSignerSecretProvider { + + private static final String CONFIG_PREFIX = + "signer.secret.provider.zookeeper."; + + /** + * Constant for the property that specifies the ZooKeeper connection string. + */ + public static final String ZOOKEEPER_CONNECTION_STRING = + CONFIG_PREFIX + "connection.string"; + + /** + * Constant for the property that specifies the ZooKeeper path. + */ + public static final String ZOOKEEPER_PATH = CONFIG_PREFIX + "path"; + + /** + * Constant for the property that specifies the auth type to use. Supported + * values are "none" and "sasl". The default value is "none". + */ + public static final String ZOOKEEPER_AUTH_TYPE = CONFIG_PREFIX + "auth.type"; + + /** + * Constant for the property that specifies the Kerberos keytab file. + */ + public static final String ZOOKEEPER_KERBEROS_KEYTAB = + CONFIG_PREFIX + "kerberos.keytab"; + + /** + * Constant for the property that specifies the Kerberos principal. + */ + public static final String ZOOKEEPER_KERBEROS_PRINCIPAL = + CONFIG_PREFIX + "kerberos.principal"; + + /** + * Constant for the property that specifies whether or not the Curator client + * should disconnect from ZooKeeper on shutdown. The default is "true". Only + * set this to "false" if a custom Curator client is being provided and the + * disconnection is being handled elsewhere. + */ + public static final String DISCONNECT_FROM_ZOOKEEPER_ON_SHUTDOWN = + CONFIG_PREFIX + "disconnect.on.shutdown"; + + /** + * Constant for the ServletContext attribute that can be used for providing a + * custom CuratorFramework client. If set ZKSignerSecretProvider will use this + * Curator client instead of creating a new one. The providing class is + * responsible for creating and configuring the Curator client (including + * security and ACLs) in this case. + */ + public static final String + ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE = + CONFIG_PREFIX + "curator.client"; + + private static final String JAAS_LOGIN_ENTRY_NAME = + "ZKSignerSecretProviderClient"; + + private static Logger LOG = LoggerFactory.getLogger( + ZKSignerSecretProvider.class); + private String path; + /** + * Stores the next secret that will be used after the current one rolls over. + * We do this to help with rollover performance by actually deciding the next + * secret at the previous rollover. This allows us to switch to the next + * secret very quickly. Afterwards, we have plenty of time to decide on the + * next secret. + */ + private volatile byte[] nextSecret; + private final Random rand; + /** + * Stores the current version of the znode. + */ + private int zkVersion; + /** + * Stores the next date that the rollover will occur. This is only used + * for allowing new servers joining later to synchronize their rollover + * with everyone else. + */ + private long nextRolloverDate; + private long tokenValidity; + private CuratorFramework client; + private boolean shouldDisconnect; + private static int INT_BYTES = Integer.SIZE / Byte.SIZE; + private static int LONG_BYTES = Long.SIZE / Byte.SIZE; + private static int DATA_VERSION = 0; + + public ZKSignerSecretProvider() { + super(); + rand = new Random(); + } + + /** + * This constructor lets you set the seed of the Random Number Generator and + * is meant for testing. + * @param seed the seed for the random number generator + */ + @VisibleForTesting + public ZKSignerSecretProvider(long seed) { + super(); + rand = new Random(seed); + } + + @Override + public void init(Properties config, ServletContext servletContext, + long tokenValidity) throws Exception { + Object curatorClientObj = servletContext.getAttribute( + ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE); + if (curatorClientObj != null + && curatorClientObj instanceof CuratorFramework) { + client = (CuratorFramework) curatorClientObj; + } else { + client = createCuratorClient(config); + } + this.tokenValidity = tokenValidity; + shouldDisconnect = Boolean.parseBoolean( + config.getProperty(DISCONNECT_FROM_ZOOKEEPER_ON_SHUTDOWN, "true")); + path = config.getProperty(ZOOKEEPER_PATH); + if (path == null) { + throw new IllegalArgumentException(ZOOKEEPER_PATH + + " must be specified"); + } + try { + nextRolloverDate = System.currentTimeMillis() + tokenValidity; + // everyone tries to do this, only one will succeed and only when the + // znode doesn't already exist. Everyone else will synchronize on the + // data from the znode + client.create().creatingParentsIfNeeded() + .forPath(path, generateZKData(generateRandomSecret(), + generateRandomSecret(), null)); + zkVersion = 0; + LOG.info("Creating secret znode"); + } catch (KeeperException.NodeExistsException nee) { + LOG.info("The secret znode already exists, retrieving data"); + } + // Synchronize on the data from the znode + // passing true tells it to parse out all the data for initing + pullFromZK(true); + long initialDelay = nextRolloverDate - System.currentTimeMillis(); + // If it's in the past, try to find the next interval that we should + // be using + if (initialDelay < 1l) { + int i = 1; + while (initialDelay < 1l) { + initialDelay = nextRolloverDate + tokenValidity * i + - System.currentTimeMillis(); + i++; + } + } + super.startScheduler(initialDelay, tokenValidity); + } + + /** + * Disconnects from ZooKeeper unless told not to. + */ + @Override + public void destroy() { + if (shouldDisconnect && client != null) { + client.close(); + } + super.destroy(); + } + + @Override + protected synchronized void rollSecret() { + super.rollSecret(); + // Try to push the information to ZooKeeper with a potential next secret. + nextRolloverDate += tokenValidity; + byte[][] secrets = super.getAllSecrets(); + pushToZK(generateRandomSecret(), secrets[0], secrets[1]); + // Pull info from ZooKeeper to get the decided next secret + // passing false tells it that we don't care about most of the data + pullFromZK(false); + } + + @Override + protected byte[] generateNewSecret() { + // We simply return nextSecret because it's already been decided on + return nextSecret; + } + + /** + * Pushes proposed data to ZooKeeper. If a different server pushes its data + * first, it gives up. + * @param newSecret The new secret to use + * @param currentSecret The current secret + * @param previousSecret The previous secret + */ + private synchronized void pushToZK(byte[] newSecret, byte[] currentSecret, + byte[] previousSecret) { + byte[] bytes = generateZKData(newSecret, currentSecret, previousSecret); + try { + client.setData().withVersion(zkVersion).forPath(path, bytes); + } catch (KeeperException.BadVersionException bve) { + LOG.debug("Unable to push to znode; another server already did it"); + } catch (Exception ex) { + LOG.error("An unexpected exception occured pushing data to ZooKeeper", + ex); + } + } + + /** + * Serialize the data to attempt to push into ZooKeeper. The format is this: + *

    + * [DATA_VERSION, newSecretLength, newSecret, currentSecretLength, currentSecret, previousSecretLength, previousSecret, nextRolloverDate] + *

    + * Only previousSecret can be null, in which case the format looks like this: + *

    + * [DATA_VERSION, newSecretLength, newSecret, currentSecretLength, currentSecret, 0, nextRolloverDate] + *

    + * @param newSecret The new secret to use + * @param currentSecret The current secret + * @param previousSecret The previous secret + * @return The serialized data for ZooKeeper + */ + private synchronized byte[] generateZKData(byte[] newSecret, + byte[] currentSecret, byte[] previousSecret) { + int newSecretLength = newSecret.length; + int currentSecretLength = currentSecret.length; + int previousSecretLength = 0; + if (previousSecret != null) { + previousSecretLength = previousSecret.length; + } + ByteBuffer bb = ByteBuffer.allocate(INT_BYTES + INT_BYTES + newSecretLength + + INT_BYTES + currentSecretLength + INT_BYTES + previousSecretLength + + LONG_BYTES); + bb.putInt(DATA_VERSION); + bb.putInt(newSecretLength); + bb.put(newSecret); + bb.putInt(currentSecretLength); + bb.put(currentSecret); + bb.putInt(previousSecretLength); + if (previousSecretLength > 0) { + bb.put(previousSecret); + } + bb.putLong(nextRolloverDate); + return bb.array(); + } + + /** + * Pulls data from ZooKeeper. If isInit is false, it will only parse the + * next secret and version. If isInit is true, it will also parse the current + * and previous secrets, and the next rollover date; it will also init the + * secrets. Hence, isInit should only be true on startup. + * @param isInit see description above + */ + private synchronized void pullFromZK(boolean isInit) { + try { + Stat stat = new Stat(); + byte[] bytes = client.getData().storingStatIn(stat).forPath(path); + ByteBuffer bb = ByteBuffer.wrap(bytes); + int dataVersion = bb.getInt(); + if (dataVersion > DATA_VERSION) { + throw new IllegalStateException("Cannot load data from ZooKeeper; it" + + "was written with a newer version"); + } + int nextSecretLength = bb.getInt(); + byte[] nextSecret = new byte[nextSecretLength]; + bb.get(nextSecret); + this.nextSecret = nextSecret; + zkVersion = stat.getVersion(); + if (isInit) { + int currentSecretLength = bb.getInt(); + byte[] currentSecret = new byte[currentSecretLength]; + bb.get(currentSecret); + int previousSecretLength = bb.getInt(); + byte[] previousSecret = null; + if (previousSecretLength > 0) { + previousSecret = new byte[previousSecretLength]; + bb.get(previousSecret); + } + super.initSecrets(currentSecret, previousSecret); + nextRolloverDate = bb.getLong(); + } + } catch (Exception ex) { + LOG.error("An unexpected exception occurred while pulling data from" + + "ZooKeeper", ex); + } + } + + private byte[] generateRandomSecret() { + return Long.toString(rand.nextLong()).getBytes(); + } + + /** + * This method creates the Curator client and connects to ZooKeeper. + * @param config configuration properties + * @return A Curator client + * @throws java.lang.Exception + */ + protected CuratorFramework createCuratorClient(Properties config) + throws Exception { + String connectionString = config.getProperty( + ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + ACLProvider aclProvider; + String authType = config.getProperty(ZOOKEEPER_AUTH_TYPE, "none"); + if (authType.equals("sasl")) { + LOG.info("Connecting to ZooKeeper with SASL/Kerberos" + + "and using 'sasl' ACLs"); + String principal = setJaasConfiguration(config); + System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, + JAAS_LOGIN_ENTRY_NAME); + System.setProperty("zookeeper.authProvider.1", + "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); + aclProvider = new SASLOwnerACLProvider(principal); + } else { // "none" + LOG.info("Connecting to ZooKeeper without authentication"); + aclProvider = new DefaultACLProvider(); // open to everyone + } + CuratorFramework cf = CuratorFrameworkFactory.builder() + .connectString(connectionString) + .retryPolicy(retryPolicy) + .aclProvider(aclProvider) + .build(); + cf.start(); + return cf; + } + + private String setJaasConfiguration(Properties config) throws Exception { + String keytabFile = config.getProperty(ZOOKEEPER_KERBEROS_KEYTAB).trim(); + if (keytabFile == null || keytabFile.length() == 0) { + throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_KEYTAB + + " must be specified"); + } + String principal = config.getProperty(ZOOKEEPER_KERBEROS_PRINCIPAL) + .trim(); + if (principal == null || principal.length() == 0) { + throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_PRINCIPAL + + " must be specified"); + } + + // This is equivalent to writing a jaas.conf file and setting the system + // property, "java.security.auth.login.config", to point to it + JaasConfiguration jConf = + new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile); + Configuration.setConfiguration(jConf); + return principal.split("[/@]")[0]; + } + + /** + * Simple implementation of an {@link ACLProvider} that simply returns an ACL + * that gives all permissions only to a single principal. + */ + private static class SASLOwnerACLProvider implements ACLProvider { + + private final List saslACL; + + private SASLOwnerACLProvider(String principal) { + this.saslACL = Collections.singletonList( + new ACL(Perms.ALL, new Id("sasl", principal))); + } + + @Override + public List getDefaultAcl() { + return saslACL; + } + + @Override + public List getAclForPath(String path) { + return saslACL; + } + } + + /** + * Creates a programmatic version of a jaas.conf file. This can be used + * instead of writing a jaas.conf file and setting the system property, + * "java.security.auth.login.config", to point to that file. It is meant to be + * used for connecting to ZooKeeper. + */ + @InterfaceAudience.Private + public static class JaasConfiguration extends Configuration { + + private static AppConfigurationEntry[] entry; + private String entryName; + + /** + * Add an entry to the jaas configuration with the passed in name, + * principal, and keytab. The other necessary options will be set for you. + * + * @param entryName The name of the entry (e.g. "Client") + * @param principal The principal of the user + * @param keytab The location of the keytab + */ + public JaasConfiguration(String entryName, String principal, String keytab) { + this.entryName = entryName; + Map options = new HashMap(); + options.put("keyTab", keytab); + options.put("principal", principal); + options.put("useKeyTab", "true"); + options.put("storeKey", "true"); + options.put("useTicketCache", "false"); + options.put("refreshKrb5Config", "true"); + String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG"); + if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) { + options.put("debug", "true"); + } + entry = new AppConfigurationEntry[]{ + new AppConfigurationEntry(getKrb5LoginModuleName(), + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options)}; + } + + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + return (entryName.equals(name)) ? entry : null; + } + + private String getKrb5LoginModuleName() { + String krb5LoginModuleName; + if (System.getProperty("java.vendor").contains("IBM")) { + krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule"; + } else { + krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; + } + return krb5LoginModuleName; + } + } +} diff --git a/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm b/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm index 6377393355..88248e5237 100644 --- a/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm +++ b/hadoop-common-project/hadoop-auth/src/site/apt/Configuration.apt.vm @@ -45,14 +45,14 @@ Configuration * <<<[PREFIX.]type>>>: the authentication type keyword (<<>> or <<>>) or a Authentication handler implementation. - * <<<[PREFIX.]signature.secret>>>: The secret to SHA-sign the generated - authentication tokens. If a secret is not provided a random secret is - generated at start up time. If using multiple web application instances - behind a load-balancer a secret must be set for the application to work - properly. + * <<<[PREFIX.]signature.secret>>>: When <<>> is set to + <<>> or not specified, this is the value for the secret used to sign + the HTTP cookie. * <<<[PREFIX.]token.validity>>>: The validity -in seconds- of the generated - authentication token. The default value is <<<3600>>> seconds. + authentication token. The default value is <<<3600>>> seconds. This is also + used for the rollover interval when <<>> is set to + <<>> or <<>>. * <<<[PREFIX.]cookie.domain>>>: domain to use for the HTTP cookie that stores the authentication token. @@ -60,6 +60,12 @@ Configuration * <<<[PREFIX.]cookie.path>>>: path to use for the HTTP cookie that stores the authentication token. + * <<>>: indicates the name of the SignerSecretProvider + class to use. Possible values are: <<>>, <<>>, + <<>>, or a classname. If not specified, the <<>> + implementation will be used; and failing that, the <<>> + implementation will be used. + ** Kerberos Configuration <>: A KDC must be configured and running. @@ -239,3 +245,133 @@ Configuration ... +---+ + +** SignerSecretProvider Configuration + + The SignerSecretProvider is used to provide more advanced behaviors for the + secret used for signing the HTTP Cookies. + + These are the relevant configuration properties: + + * <<>>: indicates the name of the + SignerSecretProvider class to use. Possible values are: "string", + "random", "zookeeper", or a classname. If not specified, the "string" + implementation will be used; and failing that, the "random" implementation + will be used. + + * <<<[PREFIX.]signature.secret>>>: When <<>> is set + to <<>> or not specified, this is the value for the secret used to + sign the HTTP cookie. + + * <<<[PREFIX.]token.validity>>>: The validity -in seconds- of the generated + authentication token. The default value is <<<3600>>> seconds. This is + also used for the rollover interval when <<>> is + set to <<>> or <<>>. + + The following configuration properties are specific to the <<>> + implementation: + + * <<>>: Indicates the + ZooKeeper connection string to connect with. + + * <<>>: Indicates the ZooKeeper path + to use for storing and retrieving the secrets. All servers + that need to coordinate their secret should point to the same path + + * <<>>: Indicates the auth type + to use. Supported values are <<>> and <<>>. The default + value is <<>>. + + * <<>>: Set this to the + path with the Kerberos keytab file. This is only required if using + Kerberos. + + * <<>>: Set this to the + Kerberos principal to use. This only required if using Kerberos. + + <>: + ++---+ + + ... + + + + + signer.secret.provider + string + + + signature.secret + my_secret + + + + ... + ++---+ + + <>: + ++---+ + + ... + + + + + signer.secret.provider + random + + + token.validity + 30 + + + + ... + ++---+ + + <>: + ++---+ + + ... + + + + + signer.secret.provider + zookeeper + + + token.validity + 30 + + + signer.secret.provider.zookeeper.connection.string + zoo1:2181,zoo2:2181,zoo3:2181 + + + signer.secret.provider.zookeeper.path + /myapp/secrets + + + signer.secret.provider.zookeeper.use.kerberos.acls + true + + + signer.secret.provider.zookeeper.kerberos.keytab + /tmp/auth.keytab + + + signer.secret.provider.zookeeper.kerberos.principal + HTTP/localhost@LOCALHOST + + + + ... + ++---+ + diff --git a/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm b/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm index 6051f8cbf2..bf85f7f41b 100644 --- a/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm +++ b/hadoop-common-project/hadoop-auth/src/site/apt/index.apt.vm @@ -44,6 +44,11 @@ Hadoop Auth, Java HTTP SPNEGO ${project.version} Subsequent HTTP client requests presenting the signed HTTP Cookie have access to the protected resources until the HTTP Cookie expires. + The secret used to sign the HTTP Cookie has multiple implementations that + provide different behaviors, including a hardcoded secret string, a rolling + randomly generated secret, and a rolling randomly generated secret + synchronized between multiple servers using ZooKeeper. + * User Documentation * {{{./Examples.html}Examples}} diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java index a9a5e8c738..5d93fcfa1c 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/server/TestAuthenticationFilter.java @@ -162,7 +162,8 @@ public void testInit() throws Exception { AuthenticationFilter.AUTH_TOKEN_VALIDITY)).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); Assert.assertEquals(PseudoAuthenticationHandler.class, filter.getAuthenticationHandler().getClass()); @@ -186,7 +187,8 @@ public void testInit() throws Exception { AuthenticationFilter.SIGNATURE_SECRET)).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); Assert.assertFalse(filter.isRandomSecret()); @@ -206,10 +208,11 @@ public void testInit() throws Exception { AuthenticationFilter.SIGNATURE_SECRET)).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn( + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)).thenReturn( new SignerSecretProvider() { @Override - public void init(Properties config, long tokenValidity) { + public void init(Properties config, ServletContext servletContext, + long tokenValidity) { } @Override public byte[] getCurrentSecret() { @@ -241,7 +244,8 @@ public byte[][] getAllSecrets() { AuthenticationFilter.COOKIE_PATH)).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); Assert.assertEquals(".foo.com", filter.getCookieDomain()); @@ -265,7 +269,8 @@ public byte[][] getAllSecrets() { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); Assert.assertTrue(DummyAuthenticationHandler.init); @@ -304,7 +309,8 @@ public void testInitCaseSensitivity() throws Exception { AuthenticationFilter.AUTH_TOKEN_VALIDITY)).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -330,7 +336,8 @@ public void testGetRequestURL() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -361,13 +368,20 @@ public void testGetToken() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE); token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -398,14 +412,21 @@ public void testGetTokenExpired() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE); token.setExpires(System.currentTimeMillis() - TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -443,13 +464,20 @@ public void testGetTokenInvalidType() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); AuthenticationToken token = new AuthenticationToken("u", "p", "invalidtype"); token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -485,7 +513,8 @@ public void testDoFilterNotAuthenticated() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -538,7 +567,8 @@ private void _testDoFilterAuthentication(boolean withDomainPath, ".return", "expired.token")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); if (withDomainPath) { @@ -593,7 +623,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable { Mockito.verify(chain).doFilter(Mockito.any(ServletRequest.class), Mockito.any(ServletResponse.class)); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String value = signer.verifyAndExtract(v); AuthenticationToken token = AuthenticationToken.parse(value); assertThat(token.getExpires(), not(0L)); @@ -662,7 +698,8 @@ public void testDoFilterAuthenticated() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -671,7 +708,13 @@ public void testDoFilterAuthenticated() throws Exception { AuthenticationToken token = new AuthenticationToken("u", "p", "t"); token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -716,7 +759,8 @@ public void testDoFilterAuthenticationFailure() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -783,7 +827,8 @@ public void testDoFilterAuthenticatedExpired() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -792,7 +837,13 @@ public void testDoFilterAuthenticatedExpired() throws Exception { AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE); token.setExpires(System.currentTimeMillis() - TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider(secret)); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, secret); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -854,7 +905,8 @@ public void testDoFilterAuthenticatedInvalidType() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -863,7 +915,13 @@ public void testDoFilterAuthenticatedInvalidType() throws Exception { AuthenticationToken token = new AuthenticationToken("u", "p", "invalidtype"); token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider(secret)); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, secret); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); @@ -893,7 +951,8 @@ public void testManagementOperation() throws Exception { "management.operation.return")).elements()); ServletContext context = Mockito.mock(ServletContext.class); Mockito.when(context.getAttribute( - AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null); + AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)) + .thenReturn(null); Mockito.when(config.getServletContext()).thenReturn(context); filter.init(config); @@ -914,7 +973,13 @@ public void testManagementOperation() throws Exception { AuthenticationToken token = new AuthenticationToken("u", "p", "t"); token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned); Mockito.when(request.getCookies()).thenReturn(new Cookie[]{cookie}); diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestJaasConfiguration.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestJaasConfiguration.java new file mode 100644 index 0000000000..2b70135800 --- /dev/null +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestJaasConfiguration.java @@ -0,0 +1,55 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. See accompanying LICENSE file. + */ +package org.apache.hadoop.security.authentication.util; + +import java.util.Map; +import javax.security.auth.login.AppConfigurationEntry; +import org.junit.Assert; +import org.junit.Test; + +public class TestJaasConfiguration { + + // We won't test actually using it to authenticate because that gets messy and + // may conflict with other tests; but we can test that it otherwise behaves + // correctly + @Test + public void test() throws Exception { + String krb5LoginModuleName; + if (System.getProperty("java.vendor").contains("IBM")) { + krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule"; + } else { + krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; + } + + ZKSignerSecretProvider.JaasConfiguration jConf = + new ZKSignerSecretProvider.JaasConfiguration("foo", "foo/localhost", + "/some/location/foo.keytab"); + AppConfigurationEntry[] entries = jConf.getAppConfigurationEntry("bar"); + Assert.assertNull(entries); + entries = jConf.getAppConfigurationEntry("foo"); + Assert.assertEquals(1, entries.length); + AppConfigurationEntry entry = entries[0]; + Assert.assertEquals(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + entry.getControlFlag()); + Assert.assertEquals(krb5LoginModuleName, entry.getLoginModuleName()); + Map options = entry.getOptions(); + Assert.assertEquals("/some/location/foo.keytab", options.get("keyTab")); + Assert.assertEquals("foo/localhost", options.get("principal")); + Assert.assertEquals("true", options.get("useKeyTab")); + Assert.assertEquals("true", options.get("storeKey")); + Assert.assertEquals("false", options.get("useTicketCache")); + Assert.assertEquals("true", options.get("refreshKrb5Config")); + Assert.assertEquals(6, options.size()); + } +} diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRandomSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRandomSignerSecretProvider.java index c3384ad03b..41d4967eac 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRandomSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRandomSignerSecretProvider.java @@ -31,7 +31,7 @@ public void testGetAndRollSecrets() throws Exception { RandomSignerSecretProvider secretProvider = new RandomSignerSecretProvider(seed); try { - secretProvider.init(null, rolloverFrequency); + secretProvider.init(null, null, rolloverFrequency); byte[] currentSecret = secretProvider.getCurrentSecret(); byte[][] allSecrets = secretProvider.getAllSecrets(); diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRolloverSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRolloverSignerSecretProvider.java index 2a2986af9c..1e40c42326 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRolloverSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestRolloverSignerSecretProvider.java @@ -28,7 +28,7 @@ public void testGetAndRollSecrets() throws Exception { new TRolloverSignerSecretProvider( new byte[][]{secret1, secret2, secret3}); try { - secretProvider.init(null, rolloverFrequency); + secretProvider.init(null, null, rolloverFrequency); byte[] currentSecret = secretProvider.getCurrentSecret(); byte[][] allSecrets = secretProvider.getAllSecrets(); diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestSigner.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestSigner.java index 1e2c960a92..c6a7710571 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestSigner.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestSigner.java @@ -14,6 +14,8 @@ package org.apache.hadoop.security.authentication.util; import java.util.Properties; +import javax.servlet.ServletContext; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.junit.Assert; import org.junit.Test; @@ -21,7 +23,7 @@ public class TestSigner { @Test public void testNullAndEmptyString() throws Exception { - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + Signer signer = new Signer(createStringSignerSecretProvider()); try { signer.sign(null); Assert.fail(); @@ -42,7 +44,7 @@ public void testNullAndEmptyString() throws Exception { @Test public void testSignature() throws Exception { - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + Signer signer = new Signer(createStringSignerSecretProvider()); String s1 = signer.sign("ok"); String s2 = signer.sign("ok"); String s3 = signer.sign("wrong"); @@ -52,7 +54,7 @@ public void testSignature() throws Exception { @Test public void testVerify() throws Exception { - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + Signer signer = new Signer(createStringSignerSecretProvider()); String t = "test"; String s = signer.sign(t); String e = signer.verifyAndExtract(s); @@ -61,7 +63,7 @@ public void testVerify() throws Exception { @Test public void testInvalidSignedText() throws Exception { - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + Signer signer = new Signer(createStringSignerSecretProvider()); try { signer.verifyAndExtract("test"); Assert.fail(); @@ -74,7 +76,7 @@ public void testInvalidSignedText() throws Exception { @Test public void testTampering() throws Exception { - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + Signer signer = new Signer(createStringSignerSecretProvider()); String t = "test"; String s = signer.sign(t); s += "x"; @@ -88,6 +90,14 @@ public void testTampering() throws Exception { } } + private StringSignerSecretProvider createStringSignerSecretProvider() throws Exception { + StringSignerSecretProvider secretProvider = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty(AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, -1); + return secretProvider; + } + @Test public void testMultipleSecrets() throws Exception { TestSignerSecretProvider secretProvider = new TestSignerSecretProvider(); @@ -128,7 +138,8 @@ class TestSignerSecretProvider extends SignerSecretProvider { private byte[] previousSecret; @Override - public void init(Properties config, long tokenValidity) { + public void init(Properties config, ServletContext servletContext, + long tokenValidity) { } @Override diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestStringSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestStringSignerSecretProvider.java index c1170060ba..d8b044dcd2 100644 --- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestStringSignerSecretProvider.java +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestStringSignerSecretProvider.java @@ -13,6 +13,8 @@ */ package org.apache.hadoop.security.authentication.util; +import java.util.Properties; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.junit.Assert; import org.junit.Test; @@ -22,8 +24,11 @@ public class TestStringSignerSecretProvider { public void testGetSecrets() throws Exception { String secretStr = "secret"; StringSignerSecretProvider secretProvider - = new StringSignerSecretProvider(secretStr); - secretProvider.init(null, -1); + = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty( + AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, -1); byte[] secretBytes = secretStr.getBytes(); Assert.assertArrayEquals(secretBytes, secretProvider.getCurrentSecret()); byte[][] allSecrets = secretProvider.getAllSecrets(); diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestZKSignerSecretProvider.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestZKSignerSecretProvider.java new file mode 100644 index 0000000000..d7b6e17e11 --- /dev/null +++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/util/TestZKSignerSecretProvider.java @@ -0,0 +1,270 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. See accompanying LICENSE file. + */ +package org.apache.hadoop.security.authentication.util; + +import java.util.Arrays; +import java.util.Properties; +import java.util.Random; +import javax.servlet.ServletContext; +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestZKSignerSecretProvider { + + private TestingServer zkServer; + + @Before + public void setup() throws Exception { + zkServer = new TestingServer(); + } + + @After + public void teardown() throws Exception { + if (zkServer != null) { + zkServer.stop(); + zkServer.close(); + } + } + + @Test + // Test just one ZKSignerSecretProvider to verify that it works in the + // simplest case + public void testOne() throws Exception { + long rolloverFrequency = 15 * 1000; // rollover every 15 sec + // use the same seed so we can predict the RNG + long seed = System.currentTimeMillis(); + Random rand = new Random(seed); + byte[] secret2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secret1 = Long.toString(rand.nextLong()).getBytes(); + byte[] secret3 = Long.toString(rand.nextLong()).getBytes(); + ZKSignerSecretProvider secretProvider = new ZKSignerSecretProvider(seed); + Properties config = new Properties(); + config.setProperty( + ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING, + zkServer.getConnectString()); + config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH, + "/secret"); + try { + secretProvider.init(config, getDummyServletContext(), rolloverFrequency); + + byte[] currentSecret = secretProvider.getCurrentSecret(); + byte[][] allSecrets = secretProvider.getAllSecrets(); + Assert.assertArrayEquals(secret1, currentSecret); + Assert.assertEquals(2, allSecrets.length); + Assert.assertArrayEquals(secret1, allSecrets[0]); + Assert.assertNull(allSecrets[1]); + Thread.sleep((rolloverFrequency + 2000)); + + currentSecret = secretProvider.getCurrentSecret(); + allSecrets = secretProvider.getAllSecrets(); + Assert.assertArrayEquals(secret2, currentSecret); + Assert.assertEquals(2, allSecrets.length); + Assert.assertArrayEquals(secret2, allSecrets[0]); + Assert.assertArrayEquals(secret1, allSecrets[1]); + Thread.sleep((rolloverFrequency + 2000)); + + currentSecret = secretProvider.getCurrentSecret(); + allSecrets = secretProvider.getAllSecrets(); + Assert.assertArrayEquals(secret3, currentSecret); + Assert.assertEquals(2, allSecrets.length); + Assert.assertArrayEquals(secret3, allSecrets[0]); + Assert.assertArrayEquals(secret2, allSecrets[1]); + Thread.sleep((rolloverFrequency + 2000)); + } finally { + secretProvider.destroy(); + } + } + + @Test + public void testMultipleInit() throws Exception { + long rolloverFrequency = 15 * 1000; // rollover every 15 sec + // use the same seed so we can predict the RNG + long seedA = System.currentTimeMillis(); + Random rand = new Random(seedA); + byte[] secretA2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretA1 = Long.toString(rand.nextLong()).getBytes(); + // use the same seed so we can predict the RNG + long seedB = System.currentTimeMillis() + rand.nextLong(); + rand = new Random(seedB); + byte[] secretB2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretB1 = Long.toString(rand.nextLong()).getBytes(); + // use the same seed so we can predict the RNG + long seedC = System.currentTimeMillis() + rand.nextLong(); + rand = new Random(seedC); + byte[] secretC2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretC1 = Long.toString(rand.nextLong()).getBytes(); + ZKSignerSecretProvider secretProviderA = new ZKSignerSecretProvider(seedA); + ZKSignerSecretProvider secretProviderB = new ZKSignerSecretProvider(seedB); + ZKSignerSecretProvider secretProviderC = new ZKSignerSecretProvider(seedC); + Properties config = new Properties(); + config.setProperty( + ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING, + zkServer.getConnectString()); + config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH, + "/secret"); + try { + secretProviderA.init(config, getDummyServletContext(), rolloverFrequency); + secretProviderB.init(config, getDummyServletContext(), rolloverFrequency); + secretProviderC.init(config, getDummyServletContext(), rolloverFrequency); + + byte[] currentSecretA = secretProviderA.getCurrentSecret(); + byte[][] allSecretsA = secretProviderA.getAllSecrets(); + byte[] currentSecretB = secretProviderB.getCurrentSecret(); + byte[][] allSecretsB = secretProviderB.getAllSecrets(); + byte[] currentSecretC = secretProviderC.getCurrentSecret(); + byte[][] allSecretsC = secretProviderC.getAllSecrets(); + Assert.assertArrayEquals(currentSecretA, currentSecretB); + Assert.assertArrayEquals(currentSecretB, currentSecretC); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertEquals(2, allSecretsB.length); + Assert.assertEquals(2, allSecretsC.length); + Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]); + Assert.assertArrayEquals(allSecretsB[0], allSecretsC[0]); + Assert.assertNull(allSecretsA[1]); + Assert.assertNull(allSecretsB[1]); + Assert.assertNull(allSecretsC[1]); + char secretChosen = 'z'; + if (Arrays.equals(secretA1, currentSecretA)) { + Assert.assertArrayEquals(secretA1, allSecretsA[0]); + secretChosen = 'A'; + } else if (Arrays.equals(secretB1, currentSecretB)) { + Assert.assertArrayEquals(secretB1, allSecretsA[0]); + secretChosen = 'B'; + }else if (Arrays.equals(secretC1, currentSecretC)) { + Assert.assertArrayEquals(secretC1, allSecretsA[0]); + secretChosen = 'C'; + } else { + Assert.fail("It appears that they all agreed on the same secret, but " + + "not one of the secrets they were supposed to"); + } + Thread.sleep((rolloverFrequency + 2000)); + + currentSecretA = secretProviderA.getCurrentSecret(); + allSecretsA = secretProviderA.getAllSecrets(); + currentSecretB = secretProviderB.getCurrentSecret(); + allSecretsB = secretProviderB.getAllSecrets(); + currentSecretC = secretProviderC.getCurrentSecret(); + allSecretsC = secretProviderC.getAllSecrets(); + Assert.assertArrayEquals(currentSecretA, currentSecretB); + Assert.assertArrayEquals(currentSecretB, currentSecretC); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertEquals(2, allSecretsB.length); + Assert.assertEquals(2, allSecretsC.length); + Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]); + Assert.assertArrayEquals(allSecretsB[0], allSecretsC[0]); + Assert.assertArrayEquals(allSecretsA[1], allSecretsB[1]); + Assert.assertArrayEquals(allSecretsB[1], allSecretsC[1]); + // The second secret used is prechosen by whoever won the init; so it + // should match with whichever we saw before + if (secretChosen == 'A') { + Assert.assertArrayEquals(secretA2, currentSecretA); + } else if (secretChosen == 'B') { + Assert.assertArrayEquals(secretB2, currentSecretA); + } else if (secretChosen == 'C') { + Assert.assertArrayEquals(secretC2, currentSecretA); + } + } finally { + secretProviderC.destroy(); + secretProviderB.destroy(); + secretProviderA.destroy(); + } + } + + @Test + public void testMultipleUnsychnronized() throws Exception { + long rolloverFrequency = 15 * 1000; // rollover every 15 sec + // use the same seed so we can predict the RNG + long seedA = System.currentTimeMillis(); + Random rand = new Random(seedA); + byte[] secretA2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretA1 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretA3 = Long.toString(rand.nextLong()).getBytes(); + // use the same seed so we can predict the RNG + long seedB = System.currentTimeMillis() + rand.nextLong(); + rand = new Random(seedB); + byte[] secretB2 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretB1 = Long.toString(rand.nextLong()).getBytes(); + byte[] secretB3 = Long.toString(rand.nextLong()).getBytes(); + ZKSignerSecretProvider secretProviderA = new ZKSignerSecretProvider(seedA); + ZKSignerSecretProvider secretProviderB = new ZKSignerSecretProvider(seedB); + Properties config = new Properties(); + config.setProperty( + ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING, + zkServer.getConnectString()); + config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH, + "/secret"); + try { + secretProviderA.init(config, getDummyServletContext(), rolloverFrequency); + + byte[] currentSecretA = secretProviderA.getCurrentSecret(); + byte[][] allSecretsA = secretProviderA.getAllSecrets(); + Assert.assertArrayEquals(secretA1, currentSecretA); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertArrayEquals(secretA1, allSecretsA[0]); + Assert.assertNull(allSecretsA[1]); + Thread.sleep((rolloverFrequency + 2000)); + + currentSecretA = secretProviderA.getCurrentSecret(); + allSecretsA = secretProviderA.getAllSecrets(); + Assert.assertArrayEquals(secretA2, currentSecretA); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertArrayEquals(secretA2, allSecretsA[0]); + Assert.assertArrayEquals(secretA1, allSecretsA[1]); + Thread.sleep((rolloverFrequency / 5)); + + secretProviderB.init(config, getDummyServletContext(), rolloverFrequency); + + byte[] currentSecretB = secretProviderB.getCurrentSecret(); + byte[][] allSecretsB = secretProviderB.getAllSecrets(); + Assert.assertArrayEquals(secretA2, currentSecretB); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertArrayEquals(secretA2, allSecretsB[0]); + Assert.assertArrayEquals(secretA1, allSecretsB[1]); + Thread.sleep((rolloverFrequency)); + + currentSecretA = secretProviderA.getCurrentSecret(); + allSecretsA = secretProviderA.getAllSecrets(); + currentSecretB = secretProviderB.getCurrentSecret(); + allSecretsB = secretProviderB.getAllSecrets(); + Assert.assertArrayEquals(currentSecretA, currentSecretB); + Assert.assertEquals(2, allSecretsA.length); + Assert.assertEquals(2, allSecretsB.length); + Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]); + Assert.assertArrayEquals(allSecretsA[1], allSecretsB[1]); + if (Arrays.equals(secretA3, currentSecretA)) { + Assert.assertArrayEquals(secretA3, allSecretsA[0]); + } else if (Arrays.equals(secretB3, currentSecretB)) { + Assert.assertArrayEquals(secretB3, allSecretsA[0]); + } else { + Assert.fail("It appears that they all agreed on the same secret, but " + + "not one of the secrets they were supposed to"); + } + } finally { + secretProviderB.destroy(); + secretProviderA.destroy(); + } + } + + private ServletContext getDummyServletContext() { + ServletContext servletContext = Mockito.mock(ServletContext.class); + Mockito.when(servletContext.getAttribute(ZKSignerSecretProvider + .ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE)) + .thenReturn(null); + return servletContext; + } +} diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6f80eb2b6a..5e57c620c1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -325,6 +325,8 @@ Trunk (Unreleased) HADOOP-11052. hadoop_verify_secure_prereq's results aren't checked in bin/hdfs (aw) + HADOOP-11055. non-daemon pid files are missing (aw) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) @@ -517,6 +519,12 @@ Release 2.6.0 - UNRELEASED HADOOP-11074. Move s3-related FS connector code to hadoop-aws (David S. Wang via Colin Patrick McCabe) + HADOOP-11091. Eliminate old configuration parameter names from s3a (David + S. Wang via Colin Patrick McCabe) + + HADOOP-10868. AuthenticationFilter should support externalizing the + secret for signing and provide rotation support. (rkanter via tucu) + OPTIMIZATIONS HADOOP-10838. Byte array native checksumming. (James Thomas via todd) diff --git a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh index 1677cc06bf..db7dd5e2a9 100644 --- a/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh +++ b/hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh @@ -645,7 +645,7 @@ function hadoop_verify_secure_prereq # ${EUID} comes from the shell itself! if [[ "${EUID}" -ne 0 ]] && [[ -z "${HADOOP_SECURE_COMMAND}" ]]; then - hadoop_error "ERROR: You must be a privileged in order to run a secure serice." + hadoop_error "ERROR: You must be a privileged user in order to run a secure service." exit 1 else return 0 @@ -704,7 +704,8 @@ function hadoop_verify_logdir rm "${HADOOP_LOG_DIR}/$$" >/dev/null 2>&1 } -function hadoop_status_daemon() { +function hadoop_status_daemon() +{ # # LSB 4.1.0 compatible status command (1) # @@ -760,11 +761,19 @@ function hadoop_start_daemon # so complex! so wow! much java! local command=$1 local class=$2 - shift 2 + local pidfile=$3 + shift 3 hadoop_debug "Final CLASSPATH: ${CLASSPATH}" hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}" + # this is for the non-daemon pid creation + #shellcheck disable=SC2086 + echo $$ > "${pidfile}" 2>/dev/null + if [[ $? -gt 0 ]]; then + hadoop_error "ERROR: Cannot write ${command} pid ${pidfile}." + fi + export CLASSPATH #shellcheck disable=SC2086 exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@" @@ -779,27 +788,42 @@ function hadoop_start_daemon_wrapper local pidfile=$3 local outfile=$4 shift 4 - + + local counter + hadoop_rotate_log "${outfile}" hadoop_start_daemon "${daemonname}" \ - "$class" "$@" >> "${outfile}" 2>&1 < /dev/null & + "$class" \ + "${pidfile}" \ + "$@" >> "${outfile}" 2>&1 < /dev/null & + + # we need to avoid a race condition here + # so let's wait for the fork to finish + # before overriding with the daemonized pid + (( counter=0 )) + while [[ ! -f ${pidfile} && ${counter} -le 5 ]]; do + sleep 1 + (( counter++ )) + done + + # this is for daemon pid creation #shellcheck disable=SC2086 echo $! > "${pidfile}" 2>/dev/null if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot write pid ${pidfile}." + hadoop_error "ERROR: Cannot write ${daemonname} pid ${pidfile}." fi # shellcheck disable=SC2086 renice "${HADOOP_NICENESS}" $! >/dev/null 2>&1 if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot set priority of process $!" + hadoop_error "ERROR: Cannot set priority of ${daemoname} process $!" fi # shellcheck disable=SC2086 - disown $! 2>&1 + disown %+ >/dev/null 2>&1 if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot disconnect process $!" + hadoop_error "ERROR: Cannot disconnect ${daemoname} process $!" fi sleep 1 @@ -829,7 +853,8 @@ function hadoop_start_secure_daemon # where to send stderr. same thing, except &2 = stderr local daemonerrfile=$5 - shift 5 + local privpidfile=$6 + shift 6 hadoop_rotate_log "${daemonoutfile}" hadoop_rotate_log "${daemonerrfile}" @@ -849,17 +874,23 @@ function hadoop_start_secure_daemon hadoop_debug "Final CLASSPATH: ${CLASSPATH}" hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}" + + #shellcheck disable=SC2086 + echo $$ > "${privpidfile}" 2>/dev/null + if [[ $? -gt 0 ]]; then + hadoop_error "ERROR: Cannot write ${daemoname} pid ${privpidfile}." + fi exec "${jsvc}" \ - "-Dproc_${daemonname}" \ - -outfile "${daemonoutfile}" \ - -errfile "${daemonerrfile}" \ - -pidfile "${daemonpidfile}" \ - -nodetach \ - -user "${HADOOP_SECURE_USER}" \ - -cp "${CLASSPATH}" \ - ${HADOOP_OPTS} \ - "${class}" "$@" + "-Dproc_${daemonname}" \ + -outfile "${daemonoutfile}" \ + -errfile "${daemonerrfile}" \ + -pidfile "${daemonpidfile}" \ + -nodetach \ + -user "${HADOOP_SECURE_USER}" \ + -cp "${CLASSPATH}" \ + ${HADOOP_OPTS} \ + "${class}" "$@" } function hadoop_start_secure_daemon_wrapper @@ -886,39 +917,52 @@ function hadoop_start_secure_daemon_wrapper local daemonerrfile=$7 shift 7 + + local counter hadoop_rotate_log "${jsvcoutfile}" hadoop_start_secure_daemon \ - "${daemonname}" \ - "${class}" \ - "${daemonpidfile}" \ - "${daemonoutfile}" \ - "${daemonerrfile}" "$@" >> "${jsvcoutfile}" 2>&1 < /dev/null & - - # This wrapper should only have one child. Unlike Shawty Lo. + "${daemonname}" \ + "${class}" \ + "${daemonpidfile}" \ + "${daemonoutfile}" \ + "${daemonerrfile}" \ + "${jsvcpidfile}" "$@" >> "${jsvcoutfile}" 2>&1 < /dev/null & + + # we need to avoid a race condition here + # so let's wait for the fork to finish + # before overriding with the daemonized pid + (( counter=0 )) + while [[ ! -f ${pidfile} && ${counter} -le 5 ]]; do + sleep 1 + (( counter++ )) + done + + # this is for the daemon pid creation #shellcheck disable=SC2086 echo $! > "${jsvcpidfile}" 2>/dev/null if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot write pid ${pidfile}." + hadoop_error "ERROR: Cannot write ${daemonname} pid ${pidfile}." fi + sleep 1 #shellcheck disable=SC2086 renice "${HADOOP_NICENESS}" $! >/dev/null 2>&1 if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot set priority of process $!" + hadoop_error "ERROR: Cannot set priority of ${daemonname} process $!" fi if [[ -f "${daemonpidfile}" ]]; then #shellcheck disable=SC2046 - renice "${HADOOP_NICENESS}" $(cat "${daemonpidfile}") >/dev/null 2>&1 + renice "${HADOOP_NICENESS}" $(cat "${daemonpidfile}" 2>/dev/null) >/dev/null 2>&1 if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot set priority of process $(cat "${daemonpidfile}")" + hadoop_error "ERROR: Cannot set priority of ${daemonname} process $(cat "${daemonpidfile}" 2>/dev/null)" fi fi - #shellcheck disable=SC2086 - disown $! 2>&1 + #shellcheck disable=SC2046 + disown %+ >/dev/null 2>&1 if [[ $? -gt 0 ]]; then - hadoop_error "ERROR: Cannot disconnect process $!" + hadoop_error "ERROR: Cannot disconnect ${daemonname} process $!" fi # capture the ulimit output su "${HADOOP_SECURE_USER}" -c 'bash -c "ulimit -a"' >> "${jsvcoutfile}" 2>&1 @@ -994,7 +1038,7 @@ function hadoop_daemon_handler hadoop_verify_logdir hadoop_status_daemon "${daemon_pidfile}" if [[ $? == 0 ]]; then - hadoop_error "${daemonname} running as process $(cat "${daemon_pidfile}"). Stop it first." + hadoop_error "${daemonname} is running as process $(cat "${daemon_pidfile}"). Stop it first." exit 1 else # stale pid file, so just remove it and continue on @@ -1003,7 +1047,7 @@ function hadoop_daemon_handler ##COMPAT - differenticate between --daemon start and nothing # "nothing" shouldn't detach if [[ "$daemonmode" = "default" ]]; then - hadoop_start_daemon "${daemonname}" "${class}" "$@" + hadoop_start_daemon "${daemonname}" "${class}" "${daemon_pidfile}" "$@" else hadoop_start_daemon_wrapper "${daemonname}" \ "${class}" "${daemon_pidfile}" "${daemon_outfile}" "$@" @@ -1042,7 +1086,7 @@ function hadoop_secure_daemon_handler hadoop_verify_logdir hadoop_status_daemon "${daemon_pidfile}" if [[ $? == 0 ]]; then - hadoop_error "${daemonname} running as process $(cat "${daemon_pidfile}"). Stop it first." + hadoop_error "${daemonname} is running as process $(cat "${daemon_pidfile}"). Stop it first." exit 1 else # stale pid file, so just remove it and continue on @@ -1054,7 +1098,7 @@ function hadoop_secure_daemon_handler if [[ "${daemonmode}" = "default" ]]; then hadoop_start_secure_daemon "${daemonname}" "${classname}" \ "${daemon_pidfile}" "${daemon_outfile}" \ - "${priv_errfile}" "$@" + "${priv_errfile}" "${priv_pidfile}" "$@" else hadoop_start_secure_daemon_wrapper "${daemonname}" "${classname}" \ "${daemon_pidfile}" "${daemon_outfile}" \ diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java index c6c0d19d2a..763d168d19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/TestHttpFSServer.java @@ -66,6 +66,8 @@ import org.mortbay.jetty.webapp.WebAppContext; import com.google.common.collect.Maps; +import java.util.Properties; +import org.apache.hadoop.security.authentication.server.AuthenticationFilter; import org.apache.hadoop.security.authentication.util.StringSignerSecretProvider; public class TestHttpFSServer extends HFSTestCase { @@ -685,7 +687,11 @@ public void testDelegationTokenOperations() throws Exception { new AuthenticationToken("u", "p", new KerberosDelegationTokenAuthenticationHandler().getType()); token.setExpires(System.currentTimeMillis() + 100000000); - Signer signer = new Signer(new StringSignerSecretProvider("secret")); + StringSignerSecretProvider secretProvider = new StringSignerSecretProvider(); + Properties secretProviderProps = new Properties(); + secretProviderProps.setProperty(AuthenticationFilter.SIGNATURE_SECRET, "secret"); + secretProvider.init(secretProviderProps, null, -1); + Signer signer = new Signer(secretProvider); String tokenSigned = signer.sign(token.toString()); url = new URL(TestJettyHelper.getJettyURL(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f596265f4e..1225311ea2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -461,6 +461,9 @@ Release 2.6.0 - UNRELEASED HDFS-7059. HAadmin transtionToActive with forceActive option can show confusing message. + HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims + via cmccabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) @@ -653,6 +656,9 @@ Release 2.6.0 - UNRELEASED HDFS-7032. Add WebHDFS support for reading and writing to encryption zones. (clamb via wang) + HDFS-6965. NN continues to issue block locations for DNs with full disks. + (Rushabh Shah via kihwal) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index ed1abd8fc2..6ff306c8a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -88,6 +88,10 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; + import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -356,13 +360,23 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception { /** Append on an existing block? */ private final boolean isAppend; + private final Span traceSpan; + /** * Default construction for file create */ private DataStreamer(HdfsFileStatus stat) { + this(stat, null); + } + + /** + * construction with tracing info + */ + private DataStreamer(HdfsFileStatus stat, Span span) { isAppend = false; isLazyPersistFile = stat.isLazyPersist(); stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; + traceSpan = span; } /** @@ -373,9 +387,10 @@ private DataStreamer(HdfsFileStatus stat) { * @throws IOException if error occurs */ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, - int bytesPerChecksum) throws IOException { + int bytesPerChecksum, Span span) throws IOException { isAppend = true; stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; + traceSpan = span; block = lastBlock.getBlock(); bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); @@ -466,6 +481,10 @@ private void endBlock() { @Override public void run() { long lastPacket = Time.now(); + TraceScope traceScope = null; + if (traceSpan != null) { + traceScope = Trace.continueSpan(traceSpan); + } while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder @@ -639,6 +658,9 @@ public void run() { } } } + if (traceScope != null) { + traceScope.close(); + } closeInternal(); } @@ -1614,7 +1636,11 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(stat); + Span traceSpan = null; + if (Trace.isTracing()) { + traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); + } + streamer = new DataStreamer(stat, traceSpan); if (favoredNodes != null && favoredNodes.length != 0) { streamer.setFavoredNodes(favoredNodes); } @@ -1655,15 +1681,21 @@ private DFSOutputStream(DFSClient dfsClient, String src, this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened + Span traceSpan = null; + if (Trace.isTracing()) { + traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach(); + } + // The last partial block of the file has to be filled. if (lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum()); + streamer = new DataStreamer(lastBlock, stat, + checksum.getBytesPerChecksum(), traceSpan); } else { computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); - streamer = new DataStreamer(stat); + streamer = new DataStreamer(stat, traceSpan); } this.fileEncryptionInfo = stat.getFileEncryptionInfo(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index 6be3810c91..b91e17a3b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -25,12 +25,16 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; - +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceInfo; +import org.htrace.TraceScope; /** * Static utilities for dealing with the protocol buffers used by the @@ -78,9 +82,41 @@ static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token blockToken) { - return BaseHeaderProto.newBuilder() + BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() .setBlock(PBHelper.convert(blk)) - .setToken(PBHelper.convert(blockToken)) - .build(); + .setToken(PBHelper.convert(blockToken)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()) + .setParentId(s.getSpanId())); + } + return builder.build(); + } + + public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { + if (proto == null) return null; + if (!proto.hasTraceId()) return null; + return new TraceInfo(proto.getTraceId(), proto.getParentId()); + } + + public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, + String description) { + return continueTraceSpan(header.getBaseHeader(), description); + } + + public static TraceScope continueTraceSpan(BaseHeaderProto header, + String description) { + return continueTraceSpan(header.getTraceInfo(), description); + } + + public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, + String description) { + TraceScope scope = null; + TraceInfo info = fromProto(proto); + if (info != null) { + scope = Trace.startSpan(description, info); + } + return scope; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 78693bb8e4..8192925166 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; +import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import java.io.DataInputStream; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.htrace.TraceScope; /** Receiver */ @InterfaceAudience.Private @@ -108,7 +110,10 @@ static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) /** Receive OP_READ_BLOCK */ private void opReadBlock() throws IOException { OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in)); - readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), proto.getOffset(), @@ -117,28 +122,37 @@ private void opReadBlock() throws IOException { (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); - writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), - PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), - proto.getHeader().getClientName(), - targets, - PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), - PBHelper.convert(proto.getSource()), - fromProto(proto.getStage()), - proto.getPipelineSize(), - proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), - proto.getLatestGenerationStamp(), - fromProto(proto.getRequestedChecksum()), - (proto.hasCachingStrategy() ? - getCachingStrategy(proto.getCachingStrategy()) : - CachingStrategy.newDefaultStrategy()), - (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), + proto.getHeader().getClientName(), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), + PBHelper.convert(proto.getSource()), + fromProto(proto.getStage()), + proto.getPipelineSize(), + proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), + proto.getLatestGenerationStamp(), + fromProto(proto.getRequestedChecksum()), + (proto.hasCachingStrategy() ? + getCachingStrategy(proto.getCachingStrategy()) : + CachingStrategy.newDefaultStrategy()), + (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#TRANSFER_BLOCK} */ @@ -146,11 +160,17 @@ private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); - transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), - PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), - proto.getHeader().getClientName(), - targets, - PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), + proto.getHeader().getClientName(), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -159,9 +179,15 @@ private void opRequestShortCircuitFds(DataInputStream in) throws IOException { OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in)); SlotId slotId = (proto.hasSlotId()) ? PBHelper.convert(proto.getSlotId()) : null; - requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convert(proto.getHeader().getToken()), - slotId, proto.getMaxVersion()); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken()), + slotId, proto.getMaxVersion()); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */ @@ -169,38 +195,67 @@ private void opReleaseShortCircuitFds(DataInputStream in) throws IOException { final ReleaseShortCircuitAccessRequestProto proto = ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in)); - releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); + TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(), + proto.getClass().getSimpleName()); + try { + releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */ private void opRequestShortCircuitShm(DataInputStream in) throws IOException { final ShortCircuitShmRequestProto proto = ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in)); - requestShortCircuitShm(proto.getClientName()); + TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(), + proto.getClass().getSimpleName()); + try { + requestShortCircuitShm(proto.getClientName()); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_REPLACE_BLOCK */ private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); - replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convertStorageType(proto.getStorageType()), - PBHelper.convert(proto.getHeader().getToken()), - proto.getDelHint(), - PBHelper.convert(proto.getSource())); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), + PBHelper.convert(proto.getHeader().getToken()), + proto.getDelHint(), + PBHelper.convert(proto.getSource())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_COPY_BLOCK */ private void opCopyBlock(DataInputStream in) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in)); - copyBlock(PBHelper.convert(proto.getHeader().getBlock()), - PBHelper.convert(proto.getHeader().getToken())); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { + copyBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) traceScope.close(); + } } /** Receive OP_BLOCK_CHECKSUM */ private void opBlockChecksum(DataInputStream in) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in)); - + TraceScope traceScope = continueTraceSpan(proto.getHeader(), + proto.getClass().getSimpleName()); + try { blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken())); + } finally { + if (traceScope != null) traceScope.close(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 4298bb0692..1ae9da53ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; @@ -47,6 +48,9 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; +import org.htrace.Trace; +import org.htrace.Span; + import com.google.protobuf.Message; /** Sender */ @@ -187,19 +191,29 @@ public void requestShortCircuitFds(final ExtendedBlock blk, @Override public void releaseShortCircuitFds(SlotId slotId) throws IOException { - ReleaseShortCircuitAccessRequestProto proto = + ReleaseShortCircuitAccessRequestProto.Builder builder = ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelper.convert(slotId)). - build(); + setSlotId(PBHelper.convert(slotId)); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ReleaseShortCircuitAccessRequestProto proto = builder.build(); send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); } @Override public void requestShortCircuitShm(String clientName) throws IOException { - ShortCircuitShmRequestProto proto = + ShortCircuitShmRequestProto.Builder builder = ShortCircuitShmRequestProto.newBuilder(). - setClientName(clientName). - build(); + setClientName(clientName); + if (Trace.isTracing()) { + Span s = Trace.currentSpan(); + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() + .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + } + ShortCircuitShmRequestProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index e2026c1dfb..f77d4ab563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -635,7 +635,7 @@ private boolean isGoodTarget(DatanodeStorageInfo storage, final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE; final long scheduledSize = blockSize * node.getBlocksScheduled(); - if (requiredSize > node.getRemaining() - scheduledSize) { + if (requiredSize > storage.getRemaining() - scheduledSize) { logNodeIsNotChosen(storage, "the node does not have enough space "); return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 13747ab558..fb774b78fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto { message BaseHeaderProto { required ExtendedBlockProto block = 1; optional hadoop.common.TokenProto token = 2; + optional DataTransferTraceInfoProto traceInfo = 3; +} + +message DataTransferTraceInfoProto { + required uint64 traceId = 1; + required uint64 parentId = 2; } message ClientOperationHeaderProto { @@ -173,6 +179,7 @@ message OpRequestShortCircuitAccessProto { message ReleaseShortCircuitAccessRequestProto { required ShortCircuitShmSlotProto slotId = 1; + optional DataTransferTraceInfoProto traceInfo = 2; } message ReleaseShortCircuitAccessResponseProto { @@ -184,6 +191,7 @@ message ShortCircuitShmRequestProto { // The name of the client requesting the shared memory segment. This is // purely for logging / debugging purposes. required string clientName = 1; + optional DataTransferTraceInfoProto traceInfo = 2; } message ShortCircuitShmResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 41af2370d1..1a8262fb6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -35,17 +35,24 @@ import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -599,5 +606,53 @@ public void testSafeModeIBRAfterIncremental() throws Exception { new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); } + + + /** + * Tests that a namenode doesn't choose a datanode with full disks to + * store blocks. + * @throws Exception + */ + @Test + public void testStorageWithRemainingCapacity() throws Exception { + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + FileSystem fs = FileSystem.get(conf); + Path file1 = null; + try { + cluster.waitActive(); + final FSNamesystem namesystem = cluster.getNamesystem(); + final String poolId = namesystem.getBlockPoolId(); + final DatanodeRegistration nodeReg = + DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes(). + get(0), poolId); + final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, + nodeReg); + // By default, MiniDFSCluster will create 1 datanode with 2 storages. + // Assigning 64k for remaining storage capacity and will + //create a file with 100k. + for(DatanodeStorageInfo storage: dd.getStorageInfos()) { + storage.setUtilizationForTesting(65536, 0, 65536, 0); + } + //sum of the remaining capacity of both the storages + dd.setRemaining(131072); + file1 = new Path("testRemainingStorage.dat"); + try { + DFSTestUtil.createFile(fs, file1, 102400, 102400, 102400, (short)1, + 0x1BAD5EED); + } + catch (RemoteException re) { + GenericTestUtils.assertExceptionContains("nodes instead of " + + "minReplication", re); + } + } + finally { + // Clean up + assertTrue(fs.exists(file1)); + fs.delete(file1, true); + assertTrue(!fs.exists(file1)); + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java index bb923a2c6b..b3e6ee8e3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracing.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.GenericTestUtils; import org.htrace.HTraceConfiguration; import org.htrace.Sampler; import org.htrace.Span; @@ -39,11 +40,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; + +import com.google.common.base.Supplier; public class TestTracing { @@ -81,7 +84,12 @@ public void testWriteTraceHooks() throws Exception { "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create", "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync", - "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete" + "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete", + "DFSOutputStream", + "OpWriteBlockProto", + "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock", + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock" }; assertSpanNamesFound(expectedSpanNames); @@ -96,7 +104,7 @@ public void testWriteTraceHooks() throws Exception { // There should only be one trace id as it should all be homed in the // top trace. - for (Span span : SetSpanReceiver.SetHolder.spans) { + for (Span span : SetSpanReceiver.SetHolder.spans.values()) { Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); } } @@ -152,7 +160,8 @@ public void testReadTraceHooks() throws Exception { String[] expectedSpanNames = { "testReadTraceHooks", "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations", - "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations" + "org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations", + "OpReadBlockProto" }; assertSpanNamesFound(expectedSpanNames); @@ -168,7 +177,7 @@ public void testReadTraceHooks() throws Exception { // There should only be one trace id as it should all be homed in the // top trace. - for (Span span : SetSpanReceiver.SetHolder.spans) { + for (Span span : SetSpanReceiver.SetHolder.spans.values()) { Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); } } @@ -228,10 +237,24 @@ public static void shutDown() throws IOException { cluster.shutdown(); } - private void assertSpanNamesFound(String[] expectedSpanNames) { - Map> map = SetSpanReceiver.SetHolder.getMap(); - for (String spanName : expectedSpanNames) { - Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null); + static void assertSpanNamesFound(final String[] expectedSpanNames) { + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + Map> map = SetSpanReceiver.SetHolder.getMap(); + for (String spanName : expectedSpanNames) { + if (!map.containsKey(spanName)) { + return false; + } + } + return true; + } + }, 100, 1000); + } catch (TimeoutException e) { + Assert.fail("timed out to get expected spans: " + e.getMessage()); + } catch (InterruptedException e) { + Assert.fail("interrupted while waiting spans: " + e.getMessage()); } } @@ -249,15 +272,16 @@ public void configure(HTraceConfiguration conf) { } public void receiveSpan(Span span) { - SetHolder.spans.add(span); + SetHolder.spans.put(span.getSpanId(), span); } public void close() { } public static class SetHolder { - public static Set spans = new HashSet(); - + public static ConcurrentHashMap spans = + new ConcurrentHashMap(); + public static int size() { return spans.size(); } @@ -265,7 +289,7 @@ public static int size() { public static Map> getMap() { Map> map = new HashMap>(); - for (Span s : spans) { + for (Span s : spans.values()) { List l = map.get(s.getDescription()); if (l == null) { l = new LinkedList(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java new file mode 100644 index 0000000000..7fe8a1eab1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tracing/TestTracingShortCircuitLocalRead.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tracing; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.htrace.Sampler; +import org.htrace.Span; +import org.htrace.Trace; +import org.htrace.TraceScope; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.IOException; + +public class TestTracingShortCircuitLocalRead { + private static Configuration conf; + private static MiniDFSCluster cluster; + private static DistributedFileSystem dfs; + private static SpanReceiverHost spanReceiverHost; + private static TemporarySocketDirectory sockDir; + static final Path TEST_PATH = new Path("testShortCircuitTraceHooks"); + static final int TEST_LENGTH = 1234; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + + @Test + public void testShortCircuitTraceHooks() throws IOException { + conf = new Configuration(); + conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY, + TestTracing.SetSpanReceiver.class.getName()); + conf.setLong("dfs.blocksize", 100 * 1024); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + "testShortCircuitTraceHooks._PORT"); + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + dfs = cluster.getFileSystem(); + + try { + spanReceiverHost = SpanReceiverHost.getInstance(conf); + DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L); + + TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS); + FSDataInputStream stream = dfs.open(TEST_PATH); + byte buf[] = new byte[TEST_LENGTH]; + IOUtils.readFully(stream, buf, 0, TEST_LENGTH); + stream.close(); + ts.close(); + + String[] expectedSpanNames = { + "OpRequestShortCircuitAccessProto", + "ShortCircuitShmRequestProto" + }; + TestTracing.assertSpanNamesFound(expectedSpanNames); + } finally { + dfs.close(); + cluster.shutdown(); + } + } +} diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 502655f709..0f662a2049 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -849,6 +849,17 @@ xercesImpl 2.9.1 + + + org.apache.curator + curator-framework + 2.6.0 + + + org.apache.curator + curator-test + 2.6.0 + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 9723b82724..26b7ddd6d6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -21,46 +21,37 @@ public class Constants { // s3 access key - public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId"; - public static final String NEW_ACCESS_KEY = "fs.s3a.access.key"; + public static final String ACCESS_KEY = "fs.s3a.access.key"; // s3 secret key - public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey"; - public static final String NEW_SECRET_KEY = "fs.s3a.secret.key"; + public static final String SECRET_KEY = "fs.s3a.secret.key"; // number of simultaneous connections to s3 - public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections"; - public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; + public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum"; public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15; // connect to s3 over ssl? - public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections"; - public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; + public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled"; public static final boolean DEFAULT_SECURE_CONNECTIONS = true; // number of times we should retry errors - public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries"; - public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; + public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum"; public static final int DEFAULT_MAX_ERROR_RETRIES = 10; // seconds until we give up on a connection to s3 - public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout"; - public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; + public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout"; public static final int DEFAULT_SOCKET_TIMEOUT = 50000; // number of records to get while paging through a directory listing - public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys"; - public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; + public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum"; public static final int DEFAULT_MAX_PAGING_KEYS = 5000; // size of each of or multipart pieces in bytes - public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize"; - public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size"; + public static final String MULTIPART_SIZE = "fs.s3a.multipart.size"; public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB // minimum size in bytes before we start a multipart uploads or copy - public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize"; - public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; + public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold"; public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE; // comma separated list of directories @@ -68,18 +59,15 @@ public class Constants { // private | public-read | public-read-write | authenticated-read | // log-delivery-write | bucket-owner-read | bucket-owner-full-control - public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL"; - public static final String NEW_CANNED_ACL = "fs.s3a.acl.default"; + public static final String CANNED_ACL = "fs.s3a.acl.default"; public static final String DEFAULT_CANNED_ACL = ""; // should we try to purge old multipart uploads when starting up - public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart"; - public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; + public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge"; public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false; // purge any multipart uploads older than this number of seconds - public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge"; - public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; + public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age"; public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400; // s3 server-side encryption diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a597e62275..f6d053c332 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -95,8 +95,8 @@ public void initialize(URI name, Configuration conf) throws IOException { this.getWorkingDirectory()); // Try to get our credentials or just connect anonymously - String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null)); - String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null)); + String accessKey = conf.get(ACCESS_KEY, null); + String secretKey = conf.get(SECRET_KEY, null); String userInfo = name.getUserInfo(); if (userInfo != null) { @@ -118,37 +118,33 @@ public void initialize(URI name, Configuration conf) throws IOException { bucket = name.getHost(); ClientConfiguration awsConf = new ClientConfiguration(); - awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS, - conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS))); - awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS, - conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ? - Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES, - conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES))); - awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT, - conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT))); + awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS)); + awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES)); + awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT)); s3 = new AmazonS3Client(credentials, awsConf); - maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS, - conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS)); - partSize = conf.getLong(NEW_MULTIPART_SIZE, - conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); - partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, - conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); if (partSize < 5 * 1024 * 1024) { - LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB"); + LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); partSize = 5 * 1024 * 1024; } if (partSizeThreshold < 5 * 1024 * 1024) { - LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); partSizeThreshold = 5 * 1024 * 1024; } - String cannedACLName = conf.get(NEW_CANNED_ACL, - conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL)); + String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); if (!cannedACLName.isEmpty()) { cannedACL = CannedAccessControlList.valueOf(cannedACLName); } else { @@ -159,10 +155,10 @@ public void initialize(URI name, Configuration conf) throws IOException { throw new IOException("Bucket " + bucket + " does not exist"); } - boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART, - conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART)); - long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE, - conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE)); + boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, + DEFAULT_PURGE_EXISTING_MULTIPART); + long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, + DEFAULT_PURGE_EXISTING_MULTIPART_AGE); if (purgeExistingMultipart) { TransferManager transferManager = new TransferManager(s3); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java index bdb723e920..1609b59f4d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java @@ -75,10 +75,8 @@ public S3AOutputStream(Configuration conf, AmazonS3Client client, this.statistics = statistics; this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; - partSize = conf.getLong(NEW_MULTIPART_SIZE, - conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE)); - partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD, - conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD)); + partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); if (conf.get(BUFFER_DIR, null) != null) { lDirAlloc = new LocalDirAllocator(BUFFER_DIR); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b41ad82916..ec59cba0f5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -84,6 +84,10 @@ Release 2.6.0 - UNRELEASED failures should be ignored towards counting max-attempts. (Xuan Gong via vinodkv) + YARN-2531. Added a configuration for admins to be able to override app-configs + and enforce/not-enforce strict control of per-container cpu usage. (Varun + Vasudev via vinodkv) + IMPROVEMENTS YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc @@ -225,6 +229,9 @@ Release 2.6.0 - UNRELEASED YARN-2547. Cross Origin Filter throws UnsupportedOperationException upon destroy (Mit Desai via jeagles) + YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into + DistributedShell (xgong) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a92b3586f4..44a6fc3d34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -902,6 +902,16 @@ public class YarnConfiguration extends Configuration { public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH = NM_PREFIX + "linux-container-executor.cgroups.mount-path"; + /** + * Whether the apps should run in strict resource usage mode(not allowed to + * use spare CPU) + */ + public static final String NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE = + NM_PREFIX + "linux-container-executor.cgroups.strict-resource-usage"; + public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE = + false; + + /** * Interval of time the linux container executor should try cleaning up diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index a86b52132e..f3ce64ce07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; /** * Client for Distributed Shell application submission to YARN. @@ -163,6 +162,8 @@ public class Client { // flag to indicate whether to keep containers across application attempts. private boolean keepContainers = false; + private long attemptFailuresValidityInterval = -1; + // Debug flag boolean debugFlag = false; @@ -248,6 +249,12 @@ public Client(Configuration conf) throws Exception { " If the flag is true, running containers will not be killed when" + " application attempt fails and these containers will be retrieved by" + " the new application attempt "); + opts.addOption("attempt_failures_validity_interval", true, + "when attempt_failures_validity_interval in milliseconds is set to > 0," + + "the failure number will not take failures which happen out of " + + "the validityInterval into failure count. " + + "If failure count reaches to maxAppAttempts, " + + "the application will be failed."); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -372,6 +379,10 @@ public boolean init(String[] args) throws ParseException { clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); + attemptFailuresValidityInterval = + Long.parseLong(cliParser.getOptionValue( + "attempt_failures_validity_interval", "-1")); + log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true; @@ -456,6 +467,11 @@ public boolean run() throws IOException, YarnException { appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); + if (attemptFailuresValidityInterval >= 0) { + appContext + .setAttemptFailuresValidityInterval(attemptFailuresValidityInterval); + } + // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java new file mode 100644 index 0000000000..3004b6954e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.applications.distributedshell; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class TestDSSleepingAppMaster extends ApplicationMaster{ + + private static final Log LOG = LogFactory.getLog(TestDSSleepingAppMaster.class); + private static final long SLEEP_TIME = 5000; + + public static void main(String[] args) { + boolean result = false; + try { + TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster(); + boolean doRun = appMaster.init(args); + if (!doRun) { + System.exit(0); + } + appMaster.run(); + if (appMaster.appAttemptID.getAttemptId() <= 2) { + try { + // sleep some time + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) {} + // fail the first am. + System.exit(100); + } + result = appMaster.finish(); + } catch (Throwable t) { + System.exit(1); + } + if (result) { + LOG.info("Application Master completed successfully. exiting"); + System.exit(0); + } else { + LOG.info("Application Master failed. exiting"); + System.exit(2); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index d7a174516a..6dff94c768 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -308,6 +308,82 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { Assert.assertTrue(result); } + /* + * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. + * Set attempt_failures_validity_interval as 2.5 seconds. It will check + * how many attempt failures for previous 2.5 seconds. + * The application is expected to be successful. + */ + @Test(timeout=90000) + public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + "sleep 8", + "--master_memory", + "512", + "--container_memory", + "128", + "--attempt_failures_validity_interval", + "2500" + }; + + LOG.info("Initializing DS Client"); + Configuration conf = yarnCluster.getConfig(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + Client client = new Client(TestDSSleepingAppMaster.class.getName(), + new Configuration(conf)); + + client.init(args); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + // application should succeed + Assert.assertTrue(result); + } + + /* + * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds. + * Set attempt_failures_validity_interval as 15 seconds. It will check + * how many attempt failure for previous 15 seconds. + * The application is expected to be fail. + */ + @Test(timeout=90000) + public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { + String[] args = { + "--jar", + APPMASTER_JAR, + "--num_containers", + "1", + "--shell_command", + "sleep 8", + "--master_memory", + "512", + "--container_memory", + "128", + "--attempt_failures_validity_interval", + "15000" + }; + + LOG.info("Initializing DS Client"); + Configuration conf = yarnCluster.getConfig(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + Client client = new Client(TestDSSleepingAppMaster.class.getName(), + new Configuration(conf)); + + client.init(args); + LOG.info("Running DS Client"); + boolean result = client.run(); + + LOG.info("Client run completed. Result=" + result); + // application should be failed + Assert.assertFalse(result); + } + @Test(timeout=90000) public void testDSShellWithCustomLogPropertyFile() throws Exception { final File basedir = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 55e5bcaf5d..3a7e94ae8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1039,6 +1039,16 @@ ^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$ + + This flag determines whether apps should run with strict resource limits + or be allowed to consume spare resources if they need them. For example, turning the + flag on will restrict apps to use only their share of CPU, even if the node has spare + CPU cycles. The default value is false i.e. use available resources. Please note that + turning this flag on may reduce job throughput on the cluster. + yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage + false + + T-file compression types used to compress aggregated logs. yarn.nodemanager.log-aggregation.compression-type diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java index 0b6c2ac60b..63039d8ed2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java @@ -57,6 +57,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { private String cgroupMountPath; private boolean cpuWeightEnabled = true; + private boolean strictResourceUsageMode = false; private final String MTAB_FILE = "/proc/mounts"; private final String CGROUPS_FSTYPE = "cgroup"; @@ -71,6 +72,8 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler { private long deleteCgroupTimeout; // package private for testing purposes Clock clock; + + private float yarnProcessors; public CgroupsLCEResourcesHandler() { this.controllerPaths = new HashMap(); @@ -105,6 +108,12 @@ void initConfig() throws IOException { cgroupPrefix = cgroupPrefix.substring(1); } + this.strictResourceUsageMode = + conf + .getBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, + YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE); + int len = cgroupPrefix.length(); if (cgroupPrefix.charAt(len - 1) == '/') { cgroupPrefix = cgroupPrefix.substring(0, len - 1); @@ -132,8 +141,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin) initializeControllerPaths(); // cap overall usage to the number of cores allocated to YARN - float yarnProcessors = - NodeManagerHardwareUtils.getContainersCores(plugin, conf); + yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf); int systemProcessors = plugin.getNumProcessors(); if (systemProcessors != (int) yarnProcessors) { LOG.info("YARN containers restricted to " + yarnProcessors + " cores"); @@ -290,10 +298,25 @@ private void setupLimits(ContainerId containerId, String containerName = containerId.toString(); if (isCpuWeightEnabled()) { + int containerVCores = containerResource.getVirtualCores(); createCgroup(CONTROLLER_CPU, containerName); - int cpuShares = CPU_DEFAULT_WEIGHT * containerResource.getVirtualCores(); + int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores; updateCgroup(CONTROLLER_CPU, containerName, "shares", String.valueOf(cpuShares)); + if (strictResourceUsageMode) { + int nodeVCores = + conf.getInt(YarnConfiguration.NM_VCORES, + YarnConfiguration.DEFAULT_NM_VCORES); + if (nodeVCores != containerVCores) { + float containerCPU = + (containerVCores * yarnProcessors) / (float) nodeVCores; + int[] limits = getOverallLimits(containerCPU); + updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US, + String.valueOf(limits[0])); + updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US, + String.valueOf(limits[1])); + } + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java index 4506898804..de2af37ce9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestCgroupsLCEResourcesHandler.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager.util; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.junit.Assert; @@ -86,9 +88,13 @@ static class CustomCgroupsLCEResourceHandler extends String mtabFile; int[] limits = new int[2]; + boolean generateLimitsMode = false; @Override int[] getOverallLimits(float x) { + if (generateLimitsMode == true) { + return super.getOverallLimits(x); + } return limits; } @@ -116,32 +122,11 @@ public void testInit() throws IOException { handler.initConfig(); // create mock cgroup - File cgroupDir = new File("target", UUID.randomUUID().toString()); - if (!cgroupDir.mkdir()) { - String message = "Could not create dir " + cgroupDir.getAbsolutePath(); - throw new IOException(message); - } - File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn"); - if (!cgroupMountDir.mkdir()) { - String message = - "Could not create dir " + cgroupMountDir.getAbsolutePath(); - throw new IOException(message); - } + File cgroupDir = createMockCgroup(); + File cgroupMountDir = createMockCgroupMount(cgroupDir); // create mock mtab - String mtabContent = - "none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0"; - File mockMtab = new File("target", UUID.randomUUID().toString()); - if (!mockMtab.exists()) { - if (!mockMtab.createNewFile()) { - String message = "Could not create file " + mockMtab.getAbsolutePath(); - throw new IOException(message); - } - } - FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile()); - mtabWriter.write(mtabContent); - mtabWriter.close(); - mockMtab.deleteOnExit(); + File mockMtab = createMockMTab(cgroupDir); // setup our handler and call init() handler.setMtabFile(mockMtab.getAbsolutePath()); @@ -156,7 +141,8 @@ public void testInit() throws IOException { Assert.assertFalse(quotaFile.exists()); // subset of cpu being used, files should be created - conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); + conf + .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75); handler.limits[0] = 100 * 1000; handler.limits[1] = 1000 * 1000; handler.init(mockLCE, plugin); @@ -166,7 +152,8 @@ public void testInit() throws IOException { Assert.assertEquals(1000 * 1000, quota); // set cpu back to 100, quota should be -1 - conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100); + conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, + 100); handler.limits[0] = 100 * 1000; handler.limits[1] = 1000 * 1000; handler.init(mockLCE, plugin); @@ -213,4 +200,130 @@ public void testGetOverallLimits() { Assert.assertEquals(1000 * 1000, ret[0]); Assert.assertEquals(-1, ret[1]); } + + private File createMockCgroup() throws IOException { + File cgroupDir = new File("target", UUID.randomUUID().toString()); + if (!cgroupDir.mkdir()) { + String message = "Could not create dir " + cgroupDir.getAbsolutePath(); + throw new IOException(message); + } + return cgroupDir; + } + + private File createMockCgroupMount(File cgroupDir) throws IOException { + File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn"); + if (!cgroupMountDir.mkdir()) { + String message = + "Could not create dir " + cgroupMountDir.getAbsolutePath(); + throw new IOException(message); + } + return cgroupMountDir; + } + + private File createMockMTab(File cgroupDir) throws IOException { + String mtabContent = + "none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0"; + File mockMtab = new File("target", UUID.randomUUID().toString()); + if (!mockMtab.exists()) { + if (!mockMtab.createNewFile()) { + String message = "Could not create file " + mockMtab.getAbsolutePath(); + throw new IOException(message); + } + } + FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile()); + mtabWriter.write(mtabContent); + mtabWriter.close(); + mockMtab.deleteOnExit(); + return mockMtab; + } + + @Test + public void testContainerLimits() throws IOException { + LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor(); + CustomCgroupsLCEResourceHandler handler = + new CustomCgroupsLCEResourceHandler(); + handler.generateLimitsMode = true; + YarnConfiguration conf = new YarnConfiguration(); + final int numProcessors = 4; + ResourceCalculatorPlugin plugin = + Mockito.mock(ResourceCalculatorPlugin.class); + Mockito.doReturn(numProcessors).when(plugin).getNumProcessors(); + handler.setConf(conf); + handler.initConfig(); + + // create mock cgroup + File cgroupDir = createMockCgroup(); + File cgroupMountDir = createMockCgroupMount(cgroupDir); + + // create mock mtab + File mockMtab = createMockMTab(cgroupDir); + + // setup our handler and call init() + handler.setMtabFile(mockMtab.getAbsolutePath()); + handler.init(mockLCE, plugin); + + // check values + // default case - files shouldn't exist, strict mode off by default + ContainerId id = ContainerId.fromString("container_1_1_1_1"); + handler.preExecute(id, Resource.newInstance(1024, 1)); + File containerDir = new File(cgroupMountDir, id.toString()); + Assert.assertTrue(containerDir.exists()); + Assert.assertTrue(containerDir.isDirectory()); + File periodFile = new File(containerDir, "cpu.cfs_period_us"); + File quotaFile = new File(containerDir, "cpu.cfs_quota_us"); + Assert.assertFalse(periodFile.exists()); + Assert.assertFalse(quotaFile.exists()); + + // no files created because we're using all cpu + FileUtils.deleteQuietly(containerDir); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); + handler.initConfig(); + handler.preExecute(id, + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES)); + Assert.assertTrue(containerDir.exists()); + Assert.assertTrue(containerDir.isDirectory()); + periodFile = new File(containerDir, "cpu.cfs_period_us"); + quotaFile = new File(containerDir, "cpu.cfs_quota_us"); + Assert.assertFalse(periodFile.exists()); + Assert.assertFalse(quotaFile.exists()); + + // 50% of CPU + FileUtils.deleteQuietly(containerDir); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); + handler.initConfig(); + handler.preExecute(id, + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Assert.assertTrue(containerDir.exists()); + Assert.assertTrue(containerDir.isDirectory()); + periodFile = new File(containerDir, "cpu.cfs_period_us"); + quotaFile = new File(containerDir, "cpu.cfs_quota_us"); + Assert.assertTrue(periodFile.exists()); + Assert.assertTrue(quotaFile.exists()); + Assert.assertEquals(500 * 1000, readIntFromFile(periodFile)); + Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile)); + + // CGroups set to 50% of CPU, container set to 50% of YARN CPU + FileUtils.deleteQuietly(containerDir); + conf.setBoolean( + YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true); + conf + .setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50); + handler.initConfig(); + handler.init(mockLCE, plugin); + handler.preExecute(id, + Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2)); + Assert.assertTrue(containerDir.exists()); + Assert.assertTrue(containerDir.isDirectory()); + periodFile = new File(containerDir, "cpu.cfs_period_us"); + quotaFile = new File(containerDir, "cpu.cfs_quota_us"); + Assert.assertTrue(periodFile.exists()); + Assert.assertTrue(quotaFile.exists()); + Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile)); + Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile)); + + FileUtils.deleteQuietly(cgroupDir); + } + }