Merge branch 'trunk' into HDFS-6581

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
This commit is contained in:
arp 2014-09-16 14:32:57 -07:00
commit e0d7fb48fb
41 changed files with 2117 additions and 281 deletions

View File

@ -130,6 +130,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.security.authentication.util.RandomSignerSecretProvider;
import org.apache.hadoop.security.authentication.util.SignerSecretProvider;
import org.apache.hadoop.security.authentication.util.StringSignerSecretProvider;
import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,7 +43,7 @@
/**
* The {@link AuthenticationFilter} enables protecting web application resources with different (pluggable)
* authentication mechanisms.
* authentication mechanisms and signer secret providers.
* <p/>
* Out of the box it provides 2 authentication mechanisms: Pseudo and Kerberos SPNEGO.
* <p/>
@ -60,10 +61,13 @@
* <li>[#PREFIX#.]type: simple|kerberos|#CLASS#, 'simple' is short for the
* {@link PseudoAuthenticationHandler}, 'kerberos' is short for {@link KerberosAuthenticationHandler}, otherwise
* the full class name of the {@link AuthenticationHandler} must be specified.</li>
* <li>[#PREFIX#.]signature.secret: the secret used to sign the HTTP cookie value. The default value is a random
* value. Unless multiple webapp instances need to share the secret the random value is adequate.</li>
* <li>[#PREFIX#.]token.validity: time -in seconds- that the generated token is valid before a
* new authentication is triggered, default value is <code>3600</code> seconds.</li>
* <li>[#PREFIX#.]signature.secret: when signer.secret.provider is set to
* "string" or not specified, this is the value for the secret used to sign the
* HTTP cookie.</li>
* <li>[#PREFIX#.]token.validity: time -in seconds- that the generated token is
* valid before a new authentication is triggered, default value is
* <code>3600</code> seconds. This is also used for the rollover interval for
* the "random" and "zookeeper" SignerSecretProviders.</li>
* <li>[#PREFIX#.]cookie.domain: domain to use for the HTTP cookie that stores the authentication token.</li>
* <li>[#PREFIX#.]cookie.path: path to use for the HTTP cookie that stores the authentication token.</li>
* </ul>
@ -72,6 +76,49 @@
* {@link AuthenticationFilter} will take all the properties that start with the prefix #PREFIX#, it will remove
* the prefix from it and it will pass them to the the authentication handler for initialization. Properties that do
* not start with the prefix will not be passed to the authentication handler initialization.
* <p/>
* Out of the box it provides 3 signer secret provider implementations:
* "string", "random", and "zookeeper"
* <p/>
* Additional signer secret providers are supported via the
* {@link SignerSecretProvider} class.
* <p/>
* For the HTTP cookies mentioned above, the SignerSecretProvider is used to
* determine the secret to use for signing the cookies. Different
* implementations can have different behaviors. The "string" implementation
* simply uses the string set in the [#PREFIX#.]signature.secret property
* mentioned above. The "random" implementation uses a randomly generated
* secret that rolls over at the interval specified by the
* [#PREFIX#.]token.validity mentioned above. The "zookeeper" implementation
* is like the "random" one, except that it synchronizes the random secret
* and rollovers between multiple servers; it's meant for HA services.
* <p/>
* The relevant configuration properties are:
* <ul>
* <li>signer.secret.provider: indicates the name of the SignerSecretProvider
* class to use. Possible values are: "string", "random", "zookeeper", or a
* classname. If not specified, the "string" implementation will be used with
* [#PREFIX#.]signature.secret; and if that's not specified, the "random"
* implementation will be used.</li>
* <li>[#PREFIX#.]signature.secret: When the "string" implementation is
* specified, this value is used as the secret.</li>
* <li>[#PREFIX#.]token.validity: When the "random" or "zookeeper"
* implementations are specified, this value is used as the rollover
* interval.</li>
* </ul>
* <p/>
* The "zookeeper" implementation has additional configuration properties that
* must be specified; see {@link ZKSignerSecretProvider} for details.
* <p/>
* For subclasses of AuthenticationFilter that want additional control over the
* SignerSecretProvider, they can use the following attribute set in the
* ServletContext:
* <ul>
* <li>signer.secret.provider.object: A SignerSecretProvider implementation can
* be passed here that will be used instead of the signer.secret.provider
* configuration property. Note that the class should already be
* initialized.</li>
* </ul>
*/
@InterfaceAudience.Private
@ -112,20 +159,23 @@ public class AuthenticationFilter implements Filter {
/**
* Constant for the configuration property that indicates the name of the
* SignerSecretProvider class to use. If not specified, SIGNATURE_SECRET
* will be used or a random secret.
* SignerSecretProvider class to use.
* Possible values are: "string", "random", "zookeeper", or a classname.
* If not specified, the "string" implementation will be used with
* SIGNATURE_SECRET; and if that's not specified, the "random" implementation
* will be used.
*/
public static final String SIGNER_SECRET_PROVIDER_CLASS =
public static final String SIGNER_SECRET_PROVIDER =
"signer.secret.provider";
/**
* Constant for the attribute that can be used for providing a custom
* object that subclasses the SignerSecretProvider. Note that this should be
* set in the ServletContext and the class should already be initialized.
* If not specified, SIGNER_SECRET_PROVIDER_CLASS will be used.
* Constant for the ServletContext attribute that can be used for providing a
* custom implementation of the SignerSecretProvider. Note that the class
* should already be initialized. If not specified, SIGNER_SECRET_PROVIDER
* will be used.
*/
public static final String SIGNATURE_PROVIDER_ATTRIBUTE =
"org.apache.hadoop.security.authentication.util.SignerSecretProvider";
public static final String SIGNER_SECRET_PROVIDER_ATTRIBUTE =
"signer.secret.provider.object";
private Properties config;
private Signer signer;
@ -138,7 +188,7 @@ public class AuthenticationFilter implements Filter {
private String cookiePath;
/**
* Initializes the authentication filter.
* Initializes the authentication filter and signer secret provider.
* <p/>
* It instantiates and initializes the specified {@link AuthenticationHandler}.
* <p/>
@ -184,35 +234,19 @@ public void init(FilterConfig filterConfig) throws ServletException {
validity = Long.parseLong(config.getProperty(AUTH_TOKEN_VALIDITY, "36000"))
* 1000; //10 hours
secretProvider = (SignerSecretProvider) filterConfig.getServletContext().
getAttribute(SIGNATURE_PROVIDER_ATTRIBUTE);
getAttribute(SIGNER_SECRET_PROVIDER_ATTRIBUTE);
if (secretProvider == null) {
String signerSecretProviderClassName =
config.getProperty(configPrefix + SIGNER_SECRET_PROVIDER_CLASS, null);
if (signerSecretProviderClassName == null) {
String signatureSecret =
config.getProperty(configPrefix + SIGNATURE_SECRET, null);
if (signatureSecret != null) {
secretProvider = new StringSignerSecretProvider(signatureSecret);
} else {
secretProvider = new RandomSignerSecretProvider();
randomSecret = true;
}
} else {
try {
Class<?> klass = Thread.currentThread().getContextClassLoader().
loadClass(signerSecretProviderClassName);
secretProvider = (SignerSecretProvider) klass.newInstance();
customSecretProvider = true;
} catch (ClassNotFoundException ex) {
throw new ServletException(ex);
} catch (InstantiationException ex) {
throw new ServletException(ex);
} catch (IllegalAccessException ex) {
throw new ServletException(ex);
}
Class<? extends SignerSecretProvider> providerClass
= getProviderClass(config);
try {
secretProvider = providerClass.newInstance();
} catch (InstantiationException ex) {
throw new ServletException(ex);
} catch (IllegalAccessException ex) {
throw new ServletException(ex);
}
try {
secretProvider.init(config, validity);
secretProvider.init(config, filterConfig.getServletContext(), validity);
} catch (Exception ex) {
throw new ServletException(ex);
}
@ -225,6 +259,42 @@ public void init(FilterConfig filterConfig) throws ServletException {
cookiePath = config.getProperty(COOKIE_PATH, null);
}
@SuppressWarnings("unchecked")
private Class<? extends SignerSecretProvider> getProviderClass(Properties config)
throws ServletException {
String providerClassName;
String signerSecretProviderName
= config.getProperty(SIGNER_SECRET_PROVIDER, null);
// fallback to old behavior
if (signerSecretProviderName == null) {
String signatureSecret = config.getProperty(SIGNATURE_SECRET, null);
if (signatureSecret != null) {
providerClassName = StringSignerSecretProvider.class.getName();
} else {
providerClassName = RandomSignerSecretProvider.class.getName();
randomSecret = true;
}
} else {
if ("random".equals(signerSecretProviderName)) {
providerClassName = RandomSignerSecretProvider.class.getName();
randomSecret = true;
} else if ("string".equals(signerSecretProviderName)) {
providerClassName = StringSignerSecretProvider.class.getName();
} else if ("zookeeper".equals(signerSecretProviderName)) {
providerClassName = ZKSignerSecretProvider.class.getName();
} else {
providerClassName = signerSecretProviderName;
customSecretProvider = true;
}
}
try {
return (Class<? extends SignerSecretProvider>) Thread.currentThread().
getContextClassLoader().loadClass(providerClassName);
} catch (ClassNotFoundException ex) {
throw new ServletException(ex);
}
}
/**
* Returns the configuration properties of the {@link AuthenticationFilter}
* without the prefix. The returned properties are the same that the

View File

@ -13,12 +13,13 @@
*/
package org.apache.hadoop.security.authentication.util;
import com.google.common.annotations.VisibleForTesting;
import java.util.Random;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A SignerSecretProvider that uses a random number as it's secret. It rolls
* A SignerSecretProvider that uses a random number as its secret. It rolls
* the secret at a regular interval.
*/
@InterfaceStability.Unstable
@ -37,6 +38,7 @@ public RandomSignerSecretProvider() {
* is meant for testing.
* @param seed the seed for the random number generator
*/
@VisibleForTesting
public RandomSignerSecretProvider(long seed) {
super();
rand = new Random(seed);

View File

@ -17,6 +17,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
@ -57,12 +58,14 @@ public RolloverSignerSecretProvider() {
* Initialize the SignerSecretProvider. It initializes the current secret
* and starts the scheduler for the rollover to run at an interval of
* tokenValidity.
* @param config filter configuration
* @param config configuration properties
* @param servletContext servlet context
* @param tokenValidity The amount of time a token is valid for
* @throws Exception
*/
@Override
public void init(Properties config, long tokenValidity) throws Exception {
public void init(Properties config, ServletContext servletContext,
long tokenValidity) throws Exception {
initSecrets(generateNewSecret(), null);
startScheduler(tokenValidity, tokenValidity);
}

View File

@ -14,6 +14,7 @@
package org.apache.hadoop.security.authentication.util;
import java.util.Properties;
import javax.servlet.ServletContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -30,13 +31,13 @@ public abstract class SignerSecretProvider {
/**
* Initialize the SignerSecretProvider
* @param config filter configuration
* @param config configuration properties
* @param servletContext servlet context
* @param tokenValidity The amount of time a token is valid for
* @throws Exception
*/
public abstract void init(Properties config, long tokenValidity)
throws Exception;
public abstract void init(Properties config, ServletContext servletContext,
long tokenValidity) throws Exception;
/**
* Will be called on shutdown; subclasses should perform any cleanup here.
*/

View File

@ -14,8 +14,10 @@
package org.apache.hadoop.security.authentication.util;
import java.util.Properties;
import javax.servlet.ServletContext;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
/**
* A SignerSecretProvider that simply creates a secret based on a given String.
@ -27,14 +29,15 @@ public class StringSignerSecretProvider extends SignerSecretProvider {
private byte[] secret;
private byte[][] secrets;
public StringSignerSecretProvider(String secretStr) {
secret = secretStr.getBytes();
secrets = new byte[][]{secret};
}
public StringSignerSecretProvider() {}
@Override
public void init(Properties config, long tokenValidity) throws Exception {
// do nothing
public void init(Properties config, ServletContext servletContext,
long tokenValidity) throws Exception {
String signatureSecret = config.getProperty(
AuthenticationFilter.SIGNATURE_SECRET, null);
secret = signatureSecret.getBytes();
secrets = new byte[][]{secret};
}
@Override

View File

@ -0,0 +1,506 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.security.authentication.util;
import com.google.common.annotations.VisibleForTesting;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.servlet.ServletContext;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A SignerSecretProvider that synchronizes a rolling random secret between
* multiple servers using ZooKeeper.
* <p/>
* It works by storing the secrets and next rollover time in a ZooKeeper znode.
* All ZKSignerSecretProviders looking at that znode will use those
* secrets and next rollover time to ensure they are synchronized. There is no
* "leader" -- any of the ZKSignerSecretProviders can choose the next secret;
* which one is indeterminate. Kerberos-based ACLs can also be enforced to
* prevent a malicious third-party from getting or setting the secrets. It uses
* its own CuratorFramework client for talking to ZooKeeper. If you want to use
* your own Curator client, you can pass it to ZKSignerSecretProvider; see
* {@link org.apache.hadoop.security.authentication.server.AuthenticationFilter}
* for more details.
* <p/>
* The supported configuration properties are:
* <ul>
* <li>signer.secret.provider.zookeeper.connection.string: indicates the
* ZooKeeper connection string to connect with.</li>
* <li>signer.secret.provider.zookeeper.path: indicates the ZooKeeper path
* to use for storing and retrieving the secrets. All ZKSignerSecretProviders
* that need to coordinate should point to the same path.</li>
* <li>signer.secret.provider.zookeeper.auth.type: indicates the auth type to
* use. Supported values are "none" and "sasl". The default value is "none"
* </li>
* <li>signer.secret.provider.zookeeper.kerberos.keytab: set this to the path
* with the Kerberos keytab file. This is only required if using Kerberos.</li>
* <li>signer.secret.provider.zookeeper.kerberos.principal: set this to the
* Kerberos principal to use. This only required if using Kerberos.</li>
* <li>signer.secret.provider.zookeeper.disconnect.on.close: when set to "true",
* ZKSignerSecretProvider will close the ZooKeeper connection on shutdown. The
* default is "true". Only set this to "false" if a custom Curator client is
* being provided and the disconnection is being handled elsewhere.</li>
* </ul>
*
* The following attribute in the ServletContext can also be set if desired:
* <li>signer.secret.provider.zookeeper.curator.client: A CuratorFramework
* client object can be passed here. If given, the "zookeeper" implementation
* will use this Curator client instead of creating its own, which is useful if
* you already have a Curator client or want more control over its
* configuration.</li>
*/
@InterfaceStability.Unstable
@InterfaceAudience.Private
public class ZKSignerSecretProvider extends RolloverSignerSecretProvider {
private static final String CONFIG_PREFIX =
"signer.secret.provider.zookeeper.";
/**
* Constant for the property that specifies the ZooKeeper connection string.
*/
public static final String ZOOKEEPER_CONNECTION_STRING =
CONFIG_PREFIX + "connection.string";
/**
* Constant for the property that specifies the ZooKeeper path.
*/
public static final String ZOOKEEPER_PATH = CONFIG_PREFIX + "path";
/**
* Constant for the property that specifies the auth type to use. Supported
* values are "none" and "sasl". The default value is "none".
*/
public static final String ZOOKEEPER_AUTH_TYPE = CONFIG_PREFIX + "auth.type";
/**
* Constant for the property that specifies the Kerberos keytab file.
*/
public static final String ZOOKEEPER_KERBEROS_KEYTAB =
CONFIG_PREFIX + "kerberos.keytab";
/**
* Constant for the property that specifies the Kerberos principal.
*/
public static final String ZOOKEEPER_KERBEROS_PRINCIPAL =
CONFIG_PREFIX + "kerberos.principal";
/**
* Constant for the property that specifies whether or not the Curator client
* should disconnect from ZooKeeper on shutdown. The default is "true". Only
* set this to "false" if a custom Curator client is being provided and the
* disconnection is being handled elsewhere.
*/
public static final String DISCONNECT_FROM_ZOOKEEPER_ON_SHUTDOWN =
CONFIG_PREFIX + "disconnect.on.shutdown";
/**
* Constant for the ServletContext attribute that can be used for providing a
* custom CuratorFramework client. If set ZKSignerSecretProvider will use this
* Curator client instead of creating a new one. The providing class is
* responsible for creating and configuring the Curator client (including
* security and ACLs) in this case.
*/
public static final String
ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE =
CONFIG_PREFIX + "curator.client";
private static final String JAAS_LOGIN_ENTRY_NAME =
"ZKSignerSecretProviderClient";
private static Logger LOG = LoggerFactory.getLogger(
ZKSignerSecretProvider.class);
private String path;
/**
* Stores the next secret that will be used after the current one rolls over.
* We do this to help with rollover performance by actually deciding the next
* secret at the previous rollover. This allows us to switch to the next
* secret very quickly. Afterwards, we have plenty of time to decide on the
* next secret.
*/
private volatile byte[] nextSecret;
private final Random rand;
/**
* Stores the current version of the znode.
*/
private int zkVersion;
/**
* Stores the next date that the rollover will occur. This is only used
* for allowing new servers joining later to synchronize their rollover
* with everyone else.
*/
private long nextRolloverDate;
private long tokenValidity;
private CuratorFramework client;
private boolean shouldDisconnect;
private static int INT_BYTES = Integer.SIZE / Byte.SIZE;
private static int LONG_BYTES = Long.SIZE / Byte.SIZE;
private static int DATA_VERSION = 0;
public ZKSignerSecretProvider() {
super();
rand = new Random();
}
/**
* This constructor lets you set the seed of the Random Number Generator and
* is meant for testing.
* @param seed the seed for the random number generator
*/
@VisibleForTesting
public ZKSignerSecretProvider(long seed) {
super();
rand = new Random(seed);
}
@Override
public void init(Properties config, ServletContext servletContext,
long tokenValidity) throws Exception {
Object curatorClientObj = servletContext.getAttribute(
ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE);
if (curatorClientObj != null
&& curatorClientObj instanceof CuratorFramework) {
client = (CuratorFramework) curatorClientObj;
} else {
client = createCuratorClient(config);
}
this.tokenValidity = tokenValidity;
shouldDisconnect = Boolean.parseBoolean(
config.getProperty(DISCONNECT_FROM_ZOOKEEPER_ON_SHUTDOWN, "true"));
path = config.getProperty(ZOOKEEPER_PATH);
if (path == null) {
throw new IllegalArgumentException(ZOOKEEPER_PATH
+ " must be specified");
}
try {
nextRolloverDate = System.currentTimeMillis() + tokenValidity;
// everyone tries to do this, only one will succeed and only when the
// znode doesn't already exist. Everyone else will synchronize on the
// data from the znode
client.create().creatingParentsIfNeeded()
.forPath(path, generateZKData(generateRandomSecret(),
generateRandomSecret(), null));
zkVersion = 0;
LOG.info("Creating secret znode");
} catch (KeeperException.NodeExistsException nee) {
LOG.info("The secret znode already exists, retrieving data");
}
// Synchronize on the data from the znode
// passing true tells it to parse out all the data for initing
pullFromZK(true);
long initialDelay = nextRolloverDate - System.currentTimeMillis();
// If it's in the past, try to find the next interval that we should
// be using
if (initialDelay < 1l) {
int i = 1;
while (initialDelay < 1l) {
initialDelay = nextRolloverDate + tokenValidity * i
- System.currentTimeMillis();
i++;
}
}
super.startScheduler(initialDelay, tokenValidity);
}
/**
* Disconnects from ZooKeeper unless told not to.
*/
@Override
public void destroy() {
if (shouldDisconnect && client != null) {
client.close();
}
super.destroy();
}
@Override
protected synchronized void rollSecret() {
super.rollSecret();
// Try to push the information to ZooKeeper with a potential next secret.
nextRolloverDate += tokenValidity;
byte[][] secrets = super.getAllSecrets();
pushToZK(generateRandomSecret(), secrets[0], secrets[1]);
// Pull info from ZooKeeper to get the decided next secret
// passing false tells it that we don't care about most of the data
pullFromZK(false);
}
@Override
protected byte[] generateNewSecret() {
// We simply return nextSecret because it's already been decided on
return nextSecret;
}
/**
* Pushes proposed data to ZooKeeper. If a different server pushes its data
* first, it gives up.
* @param newSecret The new secret to use
* @param currentSecret The current secret
* @param previousSecret The previous secret
*/
private synchronized void pushToZK(byte[] newSecret, byte[] currentSecret,
byte[] previousSecret) {
byte[] bytes = generateZKData(newSecret, currentSecret, previousSecret);
try {
client.setData().withVersion(zkVersion).forPath(path, bytes);
} catch (KeeperException.BadVersionException bve) {
LOG.debug("Unable to push to znode; another server already did it");
} catch (Exception ex) {
LOG.error("An unexpected exception occured pushing data to ZooKeeper",
ex);
}
}
/**
* Serialize the data to attempt to push into ZooKeeper. The format is this:
* <p>
* [DATA_VERSION, newSecretLength, newSecret, currentSecretLength, currentSecret, previousSecretLength, previousSecret, nextRolloverDate]
* <p>
* Only previousSecret can be null, in which case the format looks like this:
* <p>
* [DATA_VERSION, newSecretLength, newSecret, currentSecretLength, currentSecret, 0, nextRolloverDate]
* <p>
* @param newSecret The new secret to use
* @param currentSecret The current secret
* @param previousSecret The previous secret
* @return The serialized data for ZooKeeper
*/
private synchronized byte[] generateZKData(byte[] newSecret,
byte[] currentSecret, byte[] previousSecret) {
int newSecretLength = newSecret.length;
int currentSecretLength = currentSecret.length;
int previousSecretLength = 0;
if (previousSecret != null) {
previousSecretLength = previousSecret.length;
}
ByteBuffer bb = ByteBuffer.allocate(INT_BYTES + INT_BYTES + newSecretLength
+ INT_BYTES + currentSecretLength + INT_BYTES + previousSecretLength
+ LONG_BYTES);
bb.putInt(DATA_VERSION);
bb.putInt(newSecretLength);
bb.put(newSecret);
bb.putInt(currentSecretLength);
bb.put(currentSecret);
bb.putInt(previousSecretLength);
if (previousSecretLength > 0) {
bb.put(previousSecret);
}
bb.putLong(nextRolloverDate);
return bb.array();
}
/**
* Pulls data from ZooKeeper. If isInit is false, it will only parse the
* next secret and version. If isInit is true, it will also parse the current
* and previous secrets, and the next rollover date; it will also init the
* secrets. Hence, isInit should only be true on startup.
* @param isInit see description above
*/
private synchronized void pullFromZK(boolean isInit) {
try {
Stat stat = new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath(path);
ByteBuffer bb = ByteBuffer.wrap(bytes);
int dataVersion = bb.getInt();
if (dataVersion > DATA_VERSION) {
throw new IllegalStateException("Cannot load data from ZooKeeper; it"
+ "was written with a newer version");
}
int nextSecretLength = bb.getInt();
byte[] nextSecret = new byte[nextSecretLength];
bb.get(nextSecret);
this.nextSecret = nextSecret;
zkVersion = stat.getVersion();
if (isInit) {
int currentSecretLength = bb.getInt();
byte[] currentSecret = new byte[currentSecretLength];
bb.get(currentSecret);
int previousSecretLength = bb.getInt();
byte[] previousSecret = null;
if (previousSecretLength > 0) {
previousSecret = new byte[previousSecretLength];
bb.get(previousSecret);
}
super.initSecrets(currentSecret, previousSecret);
nextRolloverDate = bb.getLong();
}
} catch (Exception ex) {
LOG.error("An unexpected exception occurred while pulling data from"
+ "ZooKeeper", ex);
}
}
private byte[] generateRandomSecret() {
return Long.toString(rand.nextLong()).getBytes();
}
/**
* This method creates the Curator client and connects to ZooKeeper.
* @param config configuration properties
* @return A Curator client
* @throws java.lang.Exception
*/
protected CuratorFramework createCuratorClient(Properties config)
throws Exception {
String connectionString = config.getProperty(
ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
ACLProvider aclProvider;
String authType = config.getProperty(ZOOKEEPER_AUTH_TYPE, "none");
if (authType.equals("sasl")) {
LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+ "and using 'sasl' ACLs");
String principal = setJaasConfiguration(config);
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
JAAS_LOGIN_ENTRY_NAME);
System.setProperty("zookeeper.authProvider.1",
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
aclProvider = new SASLOwnerACLProvider(principal);
} else { // "none"
LOG.info("Connecting to ZooKeeper without authentication");
aclProvider = new DefaultACLProvider(); // open to everyone
}
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.aclProvider(aclProvider)
.build();
cf.start();
return cf;
}
private String setJaasConfiguration(Properties config) throws Exception {
String keytabFile = config.getProperty(ZOOKEEPER_KERBEROS_KEYTAB).trim();
if (keytabFile == null || keytabFile.length() == 0) {
throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_KEYTAB
+ " must be specified");
}
String principal = config.getProperty(ZOOKEEPER_KERBEROS_PRINCIPAL)
.trim();
if (principal == null || principal.length() == 0) {
throw new IllegalArgumentException(ZOOKEEPER_KERBEROS_PRINCIPAL
+ " must be specified");
}
// This is equivalent to writing a jaas.conf file and setting the system
// property, "java.security.auth.login.config", to point to it
JaasConfiguration jConf =
new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
Configuration.setConfiguration(jConf);
return principal.split("[/@]")[0];
}
/**
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
* that gives all permissions only to a single principal.
*/
private static class SASLOwnerACLProvider implements ACLProvider {
private final List<ACL> saslACL;
private SASLOwnerACLProvider(String principal) {
this.saslACL = Collections.singletonList(
new ACL(Perms.ALL, new Id("sasl", principal)));
}
@Override
public List<ACL> getDefaultAcl() {
return saslACL;
}
@Override
public List<ACL> getAclForPath(String path) {
return saslACL;
}
}
/**
* Creates a programmatic version of a jaas.conf file. This can be used
* instead of writing a jaas.conf file and setting the system property,
* "java.security.auth.login.config", to point to that file. It is meant to be
* used for connecting to ZooKeeper.
*/
@InterfaceAudience.Private
public static class JaasConfiguration extends Configuration {
private static AppConfigurationEntry[] entry;
private String entryName;
/**
* Add an entry to the jaas configuration with the passed in name,
* principal, and keytab. The other necessary options will be set for you.
*
* @param entryName The name of the entry (e.g. "Client")
* @param principal The principal of the user
* @param keytab The location of the keytab
*/
public JaasConfiguration(String entryName, String principal, String keytab) {
this.entryName = entryName;
Map<String, String> options = new HashMap<String, String>();
options.put("keyTab", keytab);
options.put("principal", principal);
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("useTicketCache", "false");
options.put("refreshKrb5Config", "true");
String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
options.put("debug", "true");
}
entry = new AppConfigurationEntry[]{
new AppConfigurationEntry(getKrb5LoginModuleName(),
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
options)};
}
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return (entryName.equals(name)) ? entry : null;
}
private String getKrb5LoginModuleName() {
String krb5LoginModuleName;
if (System.getProperty("java.vendor").contains("IBM")) {
krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
} else {
krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
}
return krb5LoginModuleName;
}
}
}

View File

@ -45,14 +45,14 @@ Configuration
* <<<[PREFIX.]type>>>: the authentication type keyword (<<<simple>>> or
<<<kerberos>>>) or a Authentication handler implementation.
* <<<[PREFIX.]signature.secret>>>: The secret to SHA-sign the generated
authentication tokens. If a secret is not provided a random secret is
generated at start up time. If using multiple web application instances
behind a load-balancer a secret must be set for the application to work
properly.
* <<<[PREFIX.]signature.secret>>>: When <<<signer.secret.provider>>> is set to
<<<string>>> or not specified, this is the value for the secret used to sign
the HTTP cookie.
* <<<[PREFIX.]token.validity>>>: The validity -in seconds- of the generated
authentication token. The default value is <<<3600>>> seconds.
authentication token. The default value is <<<3600>>> seconds. This is also
used for the rollover interval when <<<signer.secret.provider>>> is set to
<<<random>>> or <<<zookeeper>>>.
* <<<[PREFIX.]cookie.domain>>>: domain to use for the HTTP cookie that stores
the authentication token.
@ -60,6 +60,12 @@ Configuration
* <<<[PREFIX.]cookie.path>>>: path to use for the HTTP cookie that stores the
authentication token.
* <<<signer.secret.provider>>>: indicates the name of the SignerSecretProvider
class to use. Possible values are: <<<string>>>, <<<random>>>,
<<<zookeeper>>>, or a classname. If not specified, the <<<string>>>
implementation will be used; and failing that, the <<<random>>>
implementation will be used.
** Kerberos Configuration
<<IMPORTANT>>: A KDC must be configured and running.
@ -239,3 +245,133 @@ Configuration
...
</web-app>
+---+
** SignerSecretProvider Configuration
The SignerSecretProvider is used to provide more advanced behaviors for the
secret used for signing the HTTP Cookies.
These are the relevant configuration properties:
* <<<signer.secret.provider>>>: indicates the name of the
SignerSecretProvider class to use. Possible values are: "string",
"random", "zookeeper", or a classname. If not specified, the "string"
implementation will be used; and failing that, the "random" implementation
will be used.
* <<<[PREFIX.]signature.secret>>>: When <<<signer.secret.provider>>> is set
to <<<string>>> or not specified, this is the value for the secret used to
sign the HTTP cookie.
* <<<[PREFIX.]token.validity>>>: The validity -in seconds- of the generated
authentication token. The default value is <<<3600>>> seconds. This is
also used for the rollover interval when <<<signer.secret.provider>>> is
set to <<<random>>> or <<<zookeeper>>>.
The following configuration properties are specific to the <<<zookeeper>>>
implementation:
* <<<signer.secret.provider.zookeeper.connection.string>>>: Indicates the
ZooKeeper connection string to connect with.
* <<<signer.secret.provider.zookeeper.path>>>: Indicates the ZooKeeper path
to use for storing and retrieving the secrets. All servers
that need to coordinate their secret should point to the same path
* <<<signer.secret.provider.zookeeper.auth.type>>>: Indicates the auth type
to use. Supported values are <<<none>>> and <<<sasl>>>. The default
value is <<<none>>>.
* <<<signer.secret.provider.zookeeper.kerberos.keytab>>>: Set this to the
path with the Kerberos keytab file. This is only required if using
Kerberos.
* <<<signer.secret.provider.zookeeper.kerberos.principal>>>: Set this to the
Kerberos principal to use. This only required if using Kerberos.
<<Example>>:
+---+
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee">
...
<filter>
<!-- AuthenticationHandler configs not shown -->
<init-param>
<param-name>signer.secret.provider</param-name>
<param-value>string</param-value>
</init-param>
<init-param>
<param-name>signature.secret</param-name>
<param-value>my_secret</param-value>
</init-param>
</filter>
...
</web-app>
+---+
<<Example>>:
+---+
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee">
...
<filter>
<!-- AuthenticationHandler configs not shown -->
<init-param>
<param-name>signer.secret.provider</param-name>
<param-value>random</param-value>
</init-param>
<init-param>
<param-name>token.validity</param-name>
<param-value>30</param-value>
</init-param>
</filter>
...
</web-app>
+---+
<<Example>>:
+---+
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee">
...
<filter>
<!-- AuthenticationHandler configs not shown -->
<init-param>
<param-name>signer.secret.provider</param-name>
<param-value>zookeeper</param-value>
</init-param>
<init-param>
<param-name>token.validity</param-name>
<param-value>30</param-value>
</init-param>
<init-param>
<param-name>signer.secret.provider.zookeeper.connection.string</param-name>
<param-value>zoo1:2181,zoo2:2181,zoo3:2181</param-value>
</init-param>
<init-param>
<param-name>signer.secret.provider.zookeeper.path</param-name>
<param-value>/myapp/secrets</param-value>
</init-param>
<init-param>
<param-name>signer.secret.provider.zookeeper.use.kerberos.acls</param-name>
<param-value>true</param-value>
</init-param>
<init-param>
<param-name>signer.secret.provider.zookeeper.kerberos.keytab</param-name>
<param-value>/tmp/auth.keytab</param-value>
</init-param>
<init-param>
<param-name>signer.secret.provider.zookeeper.kerberos.principal</param-name>
<param-value>HTTP/localhost@LOCALHOST</param-value>
</init-param>
</filter>
...
</web-app>
+---+

View File

@ -44,6 +44,11 @@ Hadoop Auth, Java HTTP SPNEGO ${project.version}
Subsequent HTTP client requests presenting the signed HTTP Cookie have access
to the protected resources until the HTTP Cookie expires.
The secret used to sign the HTTP Cookie has multiple implementations that
provide different behaviors, including a hardcoded secret string, a rolling
randomly generated secret, and a rolling randomly generated secret
synchronized between multiple servers using ZooKeeper.
* User Documentation
* {{{./Examples.html}Examples}}

View File

@ -162,7 +162,8 @@ public void testInit() throws Exception {
AuthenticationFilter.AUTH_TOKEN_VALIDITY)).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
Assert.assertEquals(PseudoAuthenticationHandler.class, filter.getAuthenticationHandler().getClass());
@ -186,7 +187,8 @@ public void testInit() throws Exception {
AuthenticationFilter.SIGNATURE_SECRET)).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
Assert.assertFalse(filter.isRandomSecret());
@ -206,10 +208,11 @@ public void testInit() throws Exception {
AuthenticationFilter.SIGNATURE_SECRET)).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE)).thenReturn(
new SignerSecretProvider() {
@Override
public void init(Properties config, long tokenValidity) {
public void init(Properties config, ServletContext servletContext,
long tokenValidity) {
}
@Override
public byte[] getCurrentSecret() {
@ -241,7 +244,8 @@ public byte[][] getAllSecrets() {
AuthenticationFilter.COOKIE_PATH)).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
Assert.assertEquals(".foo.com", filter.getCookieDomain());
@ -265,7 +269,8 @@ public byte[][] getAllSecrets() {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
Assert.assertTrue(DummyAuthenticationHandler.init);
@ -304,7 +309,8 @@ public void testInitCaseSensitivity() throws Exception {
AuthenticationFilter.AUTH_TOKEN_VALIDITY)).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -330,7 +336,8 @@ public void testGetRequestURL() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -361,13 +368,20 @@ public void testGetToken() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE);
token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -398,14 +412,21 @@ public void testGetTokenExpired() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
AuthenticationToken token =
new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE);
token.setExpires(System.currentTimeMillis() - TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -443,13 +464,20 @@ public void testGetTokenInvalidType() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
AuthenticationToken token = new AuthenticationToken("u", "p", "invalidtype");
token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -485,7 +513,8 @@ public void testDoFilterNotAuthenticated() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -538,7 +567,8 @@ private void _testDoFilterAuthentication(boolean withDomainPath,
".return", "expired.token")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
if (withDomainPath) {
@ -593,7 +623,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Mockito.verify(chain).doFilter(Mockito.any(ServletRequest.class),
Mockito.any(ServletResponse.class));
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String value = signer.verifyAndExtract(v);
AuthenticationToken token = AuthenticationToken.parse(value);
assertThat(token.getExpires(), not(0L));
@ -662,7 +698,8 @@ public void testDoFilterAuthenticated() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -671,7 +708,13 @@ public void testDoFilterAuthenticated() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", "t");
token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -716,7 +759,8 @@ public void testDoFilterAuthenticationFailure() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -783,7 +827,8 @@ public void testDoFilterAuthenticatedExpired() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -792,7 +837,13 @@ public void testDoFilterAuthenticatedExpired() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", DummyAuthenticationHandler.TYPE);
token.setExpires(System.currentTimeMillis() - TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider(secret));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, secret);
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -854,7 +905,8 @@ public void testDoFilterAuthenticatedInvalidType() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -863,7 +915,13 @@ public void testDoFilterAuthenticatedInvalidType() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", "invalidtype");
token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider(secret));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, secret);
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
@ -893,7 +951,8 @@ public void testManagementOperation() throws Exception {
"management.operation.return")).elements());
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(
AuthenticationFilter.SIGNATURE_PROVIDER_ATTRIBUTE)).thenReturn(null);
AuthenticationFilter.SIGNER_SECRET_PROVIDER_ATTRIBUTE))
.thenReturn(null);
Mockito.when(config.getServletContext()).thenReturn(context);
filter.init(config);
@ -914,7 +973,13 @@ public void testManagementOperation() throws Exception {
AuthenticationToken token = new AuthenticationToken("u", "p", "t");
token.setExpires(System.currentTimeMillis() + TOKEN_VALIDITY_SEC);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, TOKEN_VALIDITY_SEC);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
Cookie cookie = new Cookie(AuthenticatedURL.AUTH_COOKIE, tokenSigned);
Mockito.when(request.getCookies()).thenReturn(new Cookie[]{cookie});

View File

@ -0,0 +1,55 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.security.authentication.util;
import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import org.junit.Assert;
import org.junit.Test;
public class TestJaasConfiguration {
// We won't test actually using it to authenticate because that gets messy and
// may conflict with other tests; but we can test that it otherwise behaves
// correctly
@Test
public void test() throws Exception {
String krb5LoginModuleName;
if (System.getProperty("java.vendor").contains("IBM")) {
krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
} else {
krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
}
ZKSignerSecretProvider.JaasConfiguration jConf =
new ZKSignerSecretProvider.JaasConfiguration("foo", "foo/localhost",
"/some/location/foo.keytab");
AppConfigurationEntry[] entries = jConf.getAppConfigurationEntry("bar");
Assert.assertNull(entries);
entries = jConf.getAppConfigurationEntry("foo");
Assert.assertEquals(1, entries.length);
AppConfigurationEntry entry = entries[0];
Assert.assertEquals(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
entry.getControlFlag());
Assert.assertEquals(krb5LoginModuleName, entry.getLoginModuleName());
Map<String, ?> options = entry.getOptions();
Assert.assertEquals("/some/location/foo.keytab", options.get("keyTab"));
Assert.assertEquals("foo/localhost", options.get("principal"));
Assert.assertEquals("true", options.get("useKeyTab"));
Assert.assertEquals("true", options.get("storeKey"));
Assert.assertEquals("false", options.get("useTicketCache"));
Assert.assertEquals("true", options.get("refreshKrb5Config"));
Assert.assertEquals(6, options.size());
}
}

View File

@ -31,7 +31,7 @@ public void testGetAndRollSecrets() throws Exception {
RandomSignerSecretProvider secretProvider =
new RandomSignerSecretProvider(seed);
try {
secretProvider.init(null, rolloverFrequency);
secretProvider.init(null, null, rolloverFrequency);
byte[] currentSecret = secretProvider.getCurrentSecret();
byte[][] allSecrets = secretProvider.getAllSecrets();

View File

@ -28,7 +28,7 @@ public void testGetAndRollSecrets() throws Exception {
new TRolloverSignerSecretProvider(
new byte[][]{secret1, secret2, secret3});
try {
secretProvider.init(null, rolloverFrequency);
secretProvider.init(null, null, rolloverFrequency);
byte[] currentSecret = secretProvider.getCurrentSecret();
byte[][] allSecrets = secretProvider.getAllSecrets();

View File

@ -14,6 +14,8 @@
package org.apache.hadoop.security.authentication.util;
import java.util.Properties;
import javax.servlet.ServletContext;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.junit.Assert;
import org.junit.Test;
@ -21,7 +23,7 @@ public class TestSigner {
@Test
public void testNullAndEmptyString() throws Exception {
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
Signer signer = new Signer(createStringSignerSecretProvider());
try {
signer.sign(null);
Assert.fail();
@ -42,7 +44,7 @@ public void testNullAndEmptyString() throws Exception {
@Test
public void testSignature() throws Exception {
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
Signer signer = new Signer(createStringSignerSecretProvider());
String s1 = signer.sign("ok");
String s2 = signer.sign("ok");
String s3 = signer.sign("wrong");
@ -52,7 +54,7 @@ public void testSignature() throws Exception {
@Test
public void testVerify() throws Exception {
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
Signer signer = new Signer(createStringSignerSecretProvider());
String t = "test";
String s = signer.sign(t);
String e = signer.verifyAndExtract(s);
@ -61,7 +63,7 @@ public void testVerify() throws Exception {
@Test
public void testInvalidSignedText() throws Exception {
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
Signer signer = new Signer(createStringSignerSecretProvider());
try {
signer.verifyAndExtract("test");
Assert.fail();
@ -74,7 +76,7 @@ public void testInvalidSignedText() throws Exception {
@Test
public void testTampering() throws Exception {
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
Signer signer = new Signer(createStringSignerSecretProvider());
String t = "test";
String s = signer.sign(t);
s += "x";
@ -88,6 +90,14 @@ public void testTampering() throws Exception {
}
}
private StringSignerSecretProvider createStringSignerSecretProvider() throws Exception {
StringSignerSecretProvider secretProvider = new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, -1);
return secretProvider;
}
@Test
public void testMultipleSecrets() throws Exception {
TestSignerSecretProvider secretProvider = new TestSignerSecretProvider();
@ -128,7 +138,8 @@ class TestSignerSecretProvider extends SignerSecretProvider {
private byte[] previousSecret;
@Override
public void init(Properties config, long tokenValidity) {
public void init(Properties config, ServletContext servletContext,
long tokenValidity) {
}
@Override

View File

@ -13,6 +13,8 @@
*/
package org.apache.hadoop.security.authentication.util;
import java.util.Properties;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.junit.Assert;
import org.junit.Test;
@ -22,8 +24,11 @@ public class TestStringSignerSecretProvider {
public void testGetSecrets() throws Exception {
String secretStr = "secret";
StringSignerSecretProvider secretProvider
= new StringSignerSecretProvider(secretStr);
secretProvider.init(null, -1);
= new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(
AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, -1);
byte[] secretBytes = secretStr.getBytes();
Assert.assertArrayEquals(secretBytes, secretProvider.getCurrentSecret());
byte[][] allSecrets = secretProvider.getAllSecrets();

View File

@ -0,0 +1,270 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License. See accompanying LICENSE file.
*/
package org.apache.hadoop.security.authentication.util;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import javax.servlet.ServletContext;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestZKSignerSecretProvider {
private TestingServer zkServer;
@Before
public void setup() throws Exception {
zkServer = new TestingServer();
}
@After
public void teardown() throws Exception {
if (zkServer != null) {
zkServer.stop();
zkServer.close();
}
}
@Test
// Test just one ZKSignerSecretProvider to verify that it works in the
// simplest case
public void testOne() throws Exception {
long rolloverFrequency = 15 * 1000; // rollover every 15 sec
// use the same seed so we can predict the RNG
long seed = System.currentTimeMillis();
Random rand = new Random(seed);
byte[] secret2 = Long.toString(rand.nextLong()).getBytes();
byte[] secret1 = Long.toString(rand.nextLong()).getBytes();
byte[] secret3 = Long.toString(rand.nextLong()).getBytes();
ZKSignerSecretProvider secretProvider = new ZKSignerSecretProvider(seed);
Properties config = new Properties();
config.setProperty(
ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
zkServer.getConnectString());
config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH,
"/secret");
try {
secretProvider.init(config, getDummyServletContext(), rolloverFrequency);
byte[] currentSecret = secretProvider.getCurrentSecret();
byte[][] allSecrets = secretProvider.getAllSecrets();
Assert.assertArrayEquals(secret1, currentSecret);
Assert.assertEquals(2, allSecrets.length);
Assert.assertArrayEquals(secret1, allSecrets[0]);
Assert.assertNull(allSecrets[1]);
Thread.sleep((rolloverFrequency + 2000));
currentSecret = secretProvider.getCurrentSecret();
allSecrets = secretProvider.getAllSecrets();
Assert.assertArrayEquals(secret2, currentSecret);
Assert.assertEquals(2, allSecrets.length);
Assert.assertArrayEquals(secret2, allSecrets[0]);
Assert.assertArrayEquals(secret1, allSecrets[1]);
Thread.sleep((rolloverFrequency + 2000));
currentSecret = secretProvider.getCurrentSecret();
allSecrets = secretProvider.getAllSecrets();
Assert.assertArrayEquals(secret3, currentSecret);
Assert.assertEquals(2, allSecrets.length);
Assert.assertArrayEquals(secret3, allSecrets[0]);
Assert.assertArrayEquals(secret2, allSecrets[1]);
Thread.sleep((rolloverFrequency + 2000));
} finally {
secretProvider.destroy();
}
}
@Test
public void testMultipleInit() throws Exception {
long rolloverFrequency = 15 * 1000; // rollover every 15 sec
// use the same seed so we can predict the RNG
long seedA = System.currentTimeMillis();
Random rand = new Random(seedA);
byte[] secretA2 = Long.toString(rand.nextLong()).getBytes();
byte[] secretA1 = Long.toString(rand.nextLong()).getBytes();
// use the same seed so we can predict the RNG
long seedB = System.currentTimeMillis() + rand.nextLong();
rand = new Random(seedB);
byte[] secretB2 = Long.toString(rand.nextLong()).getBytes();
byte[] secretB1 = Long.toString(rand.nextLong()).getBytes();
// use the same seed so we can predict the RNG
long seedC = System.currentTimeMillis() + rand.nextLong();
rand = new Random(seedC);
byte[] secretC2 = Long.toString(rand.nextLong()).getBytes();
byte[] secretC1 = Long.toString(rand.nextLong()).getBytes();
ZKSignerSecretProvider secretProviderA = new ZKSignerSecretProvider(seedA);
ZKSignerSecretProvider secretProviderB = new ZKSignerSecretProvider(seedB);
ZKSignerSecretProvider secretProviderC = new ZKSignerSecretProvider(seedC);
Properties config = new Properties();
config.setProperty(
ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
zkServer.getConnectString());
config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH,
"/secret");
try {
secretProviderA.init(config, getDummyServletContext(), rolloverFrequency);
secretProviderB.init(config, getDummyServletContext(), rolloverFrequency);
secretProviderC.init(config, getDummyServletContext(), rolloverFrequency);
byte[] currentSecretA = secretProviderA.getCurrentSecret();
byte[][] allSecretsA = secretProviderA.getAllSecrets();
byte[] currentSecretB = secretProviderB.getCurrentSecret();
byte[][] allSecretsB = secretProviderB.getAllSecrets();
byte[] currentSecretC = secretProviderC.getCurrentSecret();
byte[][] allSecretsC = secretProviderC.getAllSecrets();
Assert.assertArrayEquals(currentSecretA, currentSecretB);
Assert.assertArrayEquals(currentSecretB, currentSecretC);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertEquals(2, allSecretsB.length);
Assert.assertEquals(2, allSecretsC.length);
Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]);
Assert.assertArrayEquals(allSecretsB[0], allSecretsC[0]);
Assert.assertNull(allSecretsA[1]);
Assert.assertNull(allSecretsB[1]);
Assert.assertNull(allSecretsC[1]);
char secretChosen = 'z';
if (Arrays.equals(secretA1, currentSecretA)) {
Assert.assertArrayEquals(secretA1, allSecretsA[0]);
secretChosen = 'A';
} else if (Arrays.equals(secretB1, currentSecretB)) {
Assert.assertArrayEquals(secretB1, allSecretsA[0]);
secretChosen = 'B';
}else if (Arrays.equals(secretC1, currentSecretC)) {
Assert.assertArrayEquals(secretC1, allSecretsA[0]);
secretChosen = 'C';
} else {
Assert.fail("It appears that they all agreed on the same secret, but "
+ "not one of the secrets they were supposed to");
}
Thread.sleep((rolloverFrequency + 2000));
currentSecretA = secretProviderA.getCurrentSecret();
allSecretsA = secretProviderA.getAllSecrets();
currentSecretB = secretProviderB.getCurrentSecret();
allSecretsB = secretProviderB.getAllSecrets();
currentSecretC = secretProviderC.getCurrentSecret();
allSecretsC = secretProviderC.getAllSecrets();
Assert.assertArrayEquals(currentSecretA, currentSecretB);
Assert.assertArrayEquals(currentSecretB, currentSecretC);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertEquals(2, allSecretsB.length);
Assert.assertEquals(2, allSecretsC.length);
Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]);
Assert.assertArrayEquals(allSecretsB[0], allSecretsC[0]);
Assert.assertArrayEquals(allSecretsA[1], allSecretsB[1]);
Assert.assertArrayEquals(allSecretsB[1], allSecretsC[1]);
// The second secret used is prechosen by whoever won the init; so it
// should match with whichever we saw before
if (secretChosen == 'A') {
Assert.assertArrayEquals(secretA2, currentSecretA);
} else if (secretChosen == 'B') {
Assert.assertArrayEquals(secretB2, currentSecretA);
} else if (secretChosen == 'C') {
Assert.assertArrayEquals(secretC2, currentSecretA);
}
} finally {
secretProviderC.destroy();
secretProviderB.destroy();
secretProviderA.destroy();
}
}
@Test
public void testMultipleUnsychnronized() throws Exception {
long rolloverFrequency = 15 * 1000; // rollover every 15 sec
// use the same seed so we can predict the RNG
long seedA = System.currentTimeMillis();
Random rand = new Random(seedA);
byte[] secretA2 = Long.toString(rand.nextLong()).getBytes();
byte[] secretA1 = Long.toString(rand.nextLong()).getBytes();
byte[] secretA3 = Long.toString(rand.nextLong()).getBytes();
// use the same seed so we can predict the RNG
long seedB = System.currentTimeMillis() + rand.nextLong();
rand = new Random(seedB);
byte[] secretB2 = Long.toString(rand.nextLong()).getBytes();
byte[] secretB1 = Long.toString(rand.nextLong()).getBytes();
byte[] secretB3 = Long.toString(rand.nextLong()).getBytes();
ZKSignerSecretProvider secretProviderA = new ZKSignerSecretProvider(seedA);
ZKSignerSecretProvider secretProviderB = new ZKSignerSecretProvider(seedB);
Properties config = new Properties();
config.setProperty(
ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
zkServer.getConnectString());
config.setProperty(ZKSignerSecretProvider.ZOOKEEPER_PATH,
"/secret");
try {
secretProviderA.init(config, getDummyServletContext(), rolloverFrequency);
byte[] currentSecretA = secretProviderA.getCurrentSecret();
byte[][] allSecretsA = secretProviderA.getAllSecrets();
Assert.assertArrayEquals(secretA1, currentSecretA);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertArrayEquals(secretA1, allSecretsA[0]);
Assert.assertNull(allSecretsA[1]);
Thread.sleep((rolloverFrequency + 2000));
currentSecretA = secretProviderA.getCurrentSecret();
allSecretsA = secretProviderA.getAllSecrets();
Assert.assertArrayEquals(secretA2, currentSecretA);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertArrayEquals(secretA2, allSecretsA[0]);
Assert.assertArrayEquals(secretA1, allSecretsA[1]);
Thread.sleep((rolloverFrequency / 5));
secretProviderB.init(config, getDummyServletContext(), rolloverFrequency);
byte[] currentSecretB = secretProviderB.getCurrentSecret();
byte[][] allSecretsB = secretProviderB.getAllSecrets();
Assert.assertArrayEquals(secretA2, currentSecretB);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertArrayEquals(secretA2, allSecretsB[0]);
Assert.assertArrayEquals(secretA1, allSecretsB[1]);
Thread.sleep((rolloverFrequency));
currentSecretA = secretProviderA.getCurrentSecret();
allSecretsA = secretProviderA.getAllSecrets();
currentSecretB = secretProviderB.getCurrentSecret();
allSecretsB = secretProviderB.getAllSecrets();
Assert.assertArrayEquals(currentSecretA, currentSecretB);
Assert.assertEquals(2, allSecretsA.length);
Assert.assertEquals(2, allSecretsB.length);
Assert.assertArrayEquals(allSecretsA[0], allSecretsB[0]);
Assert.assertArrayEquals(allSecretsA[1], allSecretsB[1]);
if (Arrays.equals(secretA3, currentSecretA)) {
Assert.assertArrayEquals(secretA3, allSecretsA[0]);
} else if (Arrays.equals(secretB3, currentSecretB)) {
Assert.assertArrayEquals(secretB3, allSecretsA[0]);
} else {
Assert.fail("It appears that they all agreed on the same secret, but "
+ "not one of the secrets they were supposed to");
}
} finally {
secretProviderB.destroy();
secretProviderA.destroy();
}
}
private ServletContext getDummyServletContext() {
ServletContext servletContext = Mockito.mock(ServletContext.class);
Mockito.when(servletContext.getAttribute(ZKSignerSecretProvider
.ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE))
.thenReturn(null);
return servletContext;
}
}

View File

@ -325,6 +325,8 @@ Trunk (Unreleased)
HADOOP-11052. hadoop_verify_secure_prereq's results aren't checked
in bin/hdfs (aw)
HADOOP-11055. non-daemon pid files are missing (aw)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -517,6 +519,12 @@ Release 2.6.0 - UNRELEASED
HADOOP-11074. Move s3-related FS connector code to hadoop-aws (David S.
Wang via Colin Patrick McCabe)
HADOOP-11091. Eliminate old configuration parameter names from s3a (David
S. Wang via Colin Patrick McCabe)
HADOOP-10868. AuthenticationFilter should support externalizing the
secret for signing and provide rotation support. (rkanter via tucu)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

View File

@ -645,7 +645,7 @@ function hadoop_verify_secure_prereq
# ${EUID} comes from the shell itself!
if [[ "${EUID}" -ne 0 ]] && [[ -z "${HADOOP_SECURE_COMMAND}" ]]; then
hadoop_error "ERROR: You must be a privileged in order to run a secure serice."
hadoop_error "ERROR: You must be a privileged user in order to run a secure service."
exit 1
else
return 0
@ -704,7 +704,8 @@ function hadoop_verify_logdir
rm "${HADOOP_LOG_DIR}/$$" >/dev/null 2>&1
}
function hadoop_status_daemon() {
function hadoop_status_daemon()
{
#
# LSB 4.1.0 compatible status command (1)
#
@ -760,11 +761,19 @@ function hadoop_start_daemon
# so complex! so wow! much java!
local command=$1
local class=$2
shift 2
local pidfile=$3
shift 3
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
# this is for the non-daemon pid creation
#shellcheck disable=SC2086
echo $$ > "${pidfile}" 2>/dev/null
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot write ${command} pid ${pidfile}."
fi
export CLASSPATH
#shellcheck disable=SC2086
exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
@ -779,27 +788,42 @@ function hadoop_start_daemon_wrapper
local pidfile=$3
local outfile=$4
shift 4
local counter
hadoop_rotate_log "${outfile}"
hadoop_start_daemon "${daemonname}" \
"$class" "$@" >> "${outfile}" 2>&1 < /dev/null &
"$class" \
"${pidfile}" \
"$@" >> "${outfile}" 2>&1 < /dev/null &
# we need to avoid a race condition here
# so let's wait for the fork to finish
# before overriding with the daemonized pid
(( counter=0 ))
while [[ ! -f ${pidfile} && ${counter} -le 5 ]]; do
sleep 1
(( counter++ ))
done
# this is for daemon pid creation
#shellcheck disable=SC2086
echo $! > "${pidfile}" 2>/dev/null
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot write pid ${pidfile}."
hadoop_error "ERROR: Cannot write ${daemonname} pid ${pidfile}."
fi
# shellcheck disable=SC2086
renice "${HADOOP_NICENESS}" $! >/dev/null 2>&1
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot set priority of process $!"
hadoop_error "ERROR: Cannot set priority of ${daemoname} process $!"
fi
# shellcheck disable=SC2086
disown $! 2>&1
disown %+ >/dev/null 2>&1
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot disconnect process $!"
hadoop_error "ERROR: Cannot disconnect ${daemoname} process $!"
fi
sleep 1
@ -829,7 +853,8 @@ function hadoop_start_secure_daemon
# where to send stderr. same thing, except &2 = stderr
local daemonerrfile=$5
shift 5
local privpidfile=$6
shift 6
hadoop_rotate_log "${daemonoutfile}"
hadoop_rotate_log "${daemonerrfile}"
@ -849,17 +874,23 @@ function hadoop_start_secure_daemon
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
#shellcheck disable=SC2086
echo $$ > "${privpidfile}" 2>/dev/null
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot write ${daemoname} pid ${privpidfile}."
fi
exec "${jsvc}" \
"-Dproc_${daemonname}" \
-outfile "${daemonoutfile}" \
-errfile "${daemonerrfile}" \
-pidfile "${daemonpidfile}" \
-nodetach \
-user "${HADOOP_SECURE_USER}" \
-cp "${CLASSPATH}" \
${HADOOP_OPTS} \
"${class}" "$@"
"-Dproc_${daemonname}" \
-outfile "${daemonoutfile}" \
-errfile "${daemonerrfile}" \
-pidfile "${daemonpidfile}" \
-nodetach \
-user "${HADOOP_SECURE_USER}" \
-cp "${CLASSPATH}" \
${HADOOP_OPTS} \
"${class}" "$@"
}
function hadoop_start_secure_daemon_wrapper
@ -886,39 +917,52 @@ function hadoop_start_secure_daemon_wrapper
local daemonerrfile=$7
shift 7
local counter
hadoop_rotate_log "${jsvcoutfile}"
hadoop_start_secure_daemon \
"${daemonname}" \
"${class}" \
"${daemonpidfile}" \
"${daemonoutfile}" \
"${daemonerrfile}" "$@" >> "${jsvcoutfile}" 2>&1 < /dev/null &
# This wrapper should only have one child. Unlike Shawty Lo.
"${daemonname}" \
"${class}" \
"${daemonpidfile}" \
"${daemonoutfile}" \
"${daemonerrfile}" \
"${jsvcpidfile}" "$@" >> "${jsvcoutfile}" 2>&1 < /dev/null &
# we need to avoid a race condition here
# so let's wait for the fork to finish
# before overriding with the daemonized pid
(( counter=0 ))
while [[ ! -f ${pidfile} && ${counter} -le 5 ]]; do
sleep 1
(( counter++ ))
done
# this is for the daemon pid creation
#shellcheck disable=SC2086
echo $! > "${jsvcpidfile}" 2>/dev/null
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot write pid ${pidfile}."
hadoop_error "ERROR: Cannot write ${daemonname} pid ${pidfile}."
fi
sleep 1
#shellcheck disable=SC2086
renice "${HADOOP_NICENESS}" $! >/dev/null 2>&1
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot set priority of process $!"
hadoop_error "ERROR: Cannot set priority of ${daemonname} process $!"
fi
if [[ -f "${daemonpidfile}" ]]; then
#shellcheck disable=SC2046
renice "${HADOOP_NICENESS}" $(cat "${daemonpidfile}") >/dev/null 2>&1
renice "${HADOOP_NICENESS}" $(cat "${daemonpidfile}" 2>/dev/null) >/dev/null 2>&1
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot set priority of process $(cat "${daemonpidfile}")"
hadoop_error "ERROR: Cannot set priority of ${daemonname} process $(cat "${daemonpidfile}" 2>/dev/null)"
fi
fi
#shellcheck disable=SC2086
disown $! 2>&1
#shellcheck disable=SC2046
disown %+ >/dev/null 2>&1
if [[ $? -gt 0 ]]; then
hadoop_error "ERROR: Cannot disconnect process $!"
hadoop_error "ERROR: Cannot disconnect ${daemonname} process $!"
fi
# capture the ulimit output
su "${HADOOP_SECURE_USER}" -c 'bash -c "ulimit -a"' >> "${jsvcoutfile}" 2>&1
@ -994,7 +1038,7 @@ function hadoop_daemon_handler
hadoop_verify_logdir
hadoop_status_daemon "${daemon_pidfile}"
if [[ $? == 0 ]]; then
hadoop_error "${daemonname} running as process $(cat "${daemon_pidfile}"). Stop it first."
hadoop_error "${daemonname} is running as process $(cat "${daemon_pidfile}"). Stop it first."
exit 1
else
# stale pid file, so just remove it and continue on
@ -1003,7 +1047,7 @@ function hadoop_daemon_handler
##COMPAT - differenticate between --daemon start and nothing
# "nothing" shouldn't detach
if [[ "$daemonmode" = "default" ]]; then
hadoop_start_daemon "${daemonname}" "${class}" "$@"
hadoop_start_daemon "${daemonname}" "${class}" "${daemon_pidfile}" "$@"
else
hadoop_start_daemon_wrapper "${daemonname}" \
"${class}" "${daemon_pidfile}" "${daemon_outfile}" "$@"
@ -1042,7 +1086,7 @@ function hadoop_secure_daemon_handler
hadoop_verify_logdir
hadoop_status_daemon "${daemon_pidfile}"
if [[ $? == 0 ]]; then
hadoop_error "${daemonname} running as process $(cat "${daemon_pidfile}"). Stop it first."
hadoop_error "${daemonname} is running as process $(cat "${daemon_pidfile}"). Stop it first."
exit 1
else
# stale pid file, so just remove it and continue on
@ -1054,7 +1098,7 @@ function hadoop_secure_daemon_handler
if [[ "${daemonmode}" = "default" ]]; then
hadoop_start_secure_daemon "${daemonname}" "${classname}" \
"${daemon_pidfile}" "${daemon_outfile}" \
"${priv_errfile}" "$@"
"${priv_errfile}" "${priv_pidfile}" "$@"
else
hadoop_start_secure_daemon_wrapper "${daemonname}" "${classname}" \
"${daemon_pidfile}" "${daemon_outfile}" \

View File

@ -66,6 +66,8 @@
import org.mortbay.jetty.webapp.WebAppContext;
import com.google.common.collect.Maps;
import java.util.Properties;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.util.StringSignerSecretProvider;
public class TestHttpFSServer extends HFSTestCase {
@ -685,7 +687,11 @@ public void testDelegationTokenOperations() throws Exception {
new AuthenticationToken("u", "p",
new KerberosDelegationTokenAuthenticationHandler().getType());
token.setExpires(System.currentTimeMillis() + 100000000);
Signer signer = new Signer(new StringSignerSecretProvider("secret"));
StringSignerSecretProvider secretProvider = new StringSignerSecretProvider();
Properties secretProviderProps = new Properties();
secretProviderProps.setProperty(AuthenticationFilter.SIGNATURE_SECRET, "secret");
secretProvider.init(secretProviderProps, null, -1);
Signer signer = new Signer(secretProvider);
String tokenSigned = signer.sign(token.toString());
url = new URL(TestJettyHelper.getJettyURL(),

View File

@ -461,6 +461,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7059. HAadmin transtionToActive with forceActive option can show
confusing message.
HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims
via cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -653,6 +656,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7032. Add WebHDFS support for reading and writing to encryption zones.
(clamb via wang)
HDFS-6965. NN continues to issue block locations for DNs with full disks.
(Rushabh Shah via kihwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -88,6 +88,10 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@ -356,13 +360,23 @@ public DatanodeInfo load(DatanodeInfo key) throws Exception {
/** Append on an existing block? */
private final boolean isAppend;
private final Span traceSpan;
/**
* Default construction for file create
*/
private DataStreamer(HdfsFileStatus stat) {
this(stat, null);
}
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false;
isLazyPersistFile = stat.isLazyPersist();
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
/**
@ -373,9 +387,10 @@ private DataStreamer(HdfsFileStatus stat) {
* @throws IOException if error occurs
*/
private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
int bytesPerChecksum, Span span) throws IOException {
isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
traceSpan = span;
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
@ -466,6 +481,10 @@ private void endBlock() {
@Override
public void run() {
long lastPacket = Time.now();
TraceScope traceScope = null;
if (traceSpan != null) {
traceScope = Trace.continueSpan(traceSpan);
}
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
@ -639,6 +658,9 @@ public void run() {
}
}
}
if (traceScope != null) {
traceScope.close();
}
closeInternal();
}
@ -1614,7 +1636,11 @@ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer(stat);
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
streamer = new DataStreamer(stat, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@ -1655,15 +1681,21 @@ private DFSOutputStream(DFSClient dfsClient, String src,
this(dfsClient, src, progress, stat, checksum);
initialFileSize = stat.getLen(); // length of file when opened
Span traceSpan = null;
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
// The last partial block of the file has to be filled.
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
streamer = new DataStreamer(lastBlock, stat,
checksum.getBytesPerChecksum(), traceSpan);
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer(stat);
streamer = new DataStreamer(stat, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}

View File

@ -25,12 +25,16 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceInfo;
import org.htrace.TraceScope;
/**
* Static utilities for dealing with the protocol buffers used by the
@ -78,9 +82,41 @@ static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken) {
return BaseHeaderProto.newBuilder()
BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder()
.setBlock(PBHelper.convert(blk))
.setToken(PBHelper.convert(blockToken))
.build();
.setToken(PBHelper.convert(blockToken));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId())
.setParentId(s.getSpanId()));
}
return builder.build();
}
public static TraceInfo fromProto(DataTransferTraceInfoProto proto) {
if (proto == null) return null;
if (!proto.hasTraceId()) return null;
return new TraceInfo(proto.getTraceId(), proto.getParentId());
}
public static TraceScope continueTraceSpan(ClientOperationHeaderProto header,
String description) {
return continueTraceSpan(header.getBaseHeader(), description);
}
public static TraceScope continueTraceSpan(BaseHeaderProto header,
String description) {
return continueTraceSpan(header.getTraceInfo(), description);
}
public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto,
String description) {
TraceScope scope = null;
TraceInfo info = fromProto(proto);
if (info != null) {
scope = Trace.startSpan(description, info);
}
return scope;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol.datatransfer;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.DataInputStream;
@ -39,6 +40,7 @@
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.htrace.TraceScope;
/** Receiver */
@InterfaceAudience.Private
@ -108,7 +110,10 @@ static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy)
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
proto.getOffset(),
@ -117,28 +122,37 @@ private void opReadBlock() throws IOException {
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive {@link Op#TRANSFER_BLOCK} */
@ -146,11 +160,17 @@ private void opTransferBlock(DataInputStream in) throws IOException {
final OpTransferBlockProto proto =
OpTransferBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@ -159,9 +179,15 @@ private void opRequestShortCircuitFds(DataInputStream in) throws IOException {
OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in));
SlotId slotId = (proto.hasSlotId()) ?
PBHelper.convert(proto.getSlotId()) : null;
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion());
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion());
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */
@ -169,38 +195,67 @@ private void opReleaseShortCircuitFds(DataInputStream in)
throws IOException {
final ReleaseShortCircuitAccessRequestProto proto =
ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in));
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
proto.getClass().getSimpleName());
try {
releaseShortCircuitFds(PBHelper.convert(proto.getSlotId()));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */
private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
final ShortCircuitShmRequestProto proto =
ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in));
requestShortCircuitShm(proto.getClientName());
TraceScope traceScope = continueTraceSpan(proto.getTraceInfo(),
proto.getClass().getSimpleName());
try {
requestShortCircuitShm(proto.getClientName());
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(),
PBHelper.convert(proto.getSource()));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
copyBlock(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
} finally {
if (traceScope != null) traceScope.close();
}
}
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()));
} finally {
if (traceScope != null) traceScope.close();
}
}
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
@ -47,6 +48,9 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.htrace.Trace;
import org.htrace.Span;
import com.google.protobuf.Message;
/** Sender */
@ -187,19 +191,29 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
@Override
public void releaseShortCircuitFds(SlotId slotId) throws IOException {
ReleaseShortCircuitAccessRequestProto proto =
ReleaseShortCircuitAccessRequestProto.Builder builder =
ReleaseShortCircuitAccessRequestProto.newBuilder().
setSlotId(PBHelper.convert(slotId)).
build();
setSlotId(PBHelper.convert(slotId));
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
}
ReleaseShortCircuitAccessRequestProto proto = builder.build();
send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto);
}
@Override
public void requestShortCircuitShm(String clientName) throws IOException {
ShortCircuitShmRequestProto proto =
ShortCircuitShmRequestProto.Builder builder =
ShortCircuitShmRequestProto.newBuilder().
setClientName(clientName).
build();
setClientName(clientName);
if (Trace.isTracing()) {
Span s = Trace.currentSpan();
builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder()
.setTraceId(s.getTraceId()).setParentId(s.getSpanId()));
}
ShortCircuitShmRequestProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto);
}

View File

@ -635,7 +635,7 @@ private boolean isGoodTarget(DatanodeStorageInfo storage,
final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * node.getBlocksScheduled();
if (requiredSize > node.getRemaining() - scheduledSize) {
if (requiredSize > storage.getRemaining() - scheduledSize) {
logNodeIsNotChosen(storage, "the node does not have enough space ");
return false;
}

View File

@ -47,6 +47,12 @@ message DataTransferEncryptorMessageProto {
message BaseHeaderProto {
required ExtendedBlockProto block = 1;
optional hadoop.common.TokenProto token = 2;
optional DataTransferTraceInfoProto traceInfo = 3;
}
message DataTransferTraceInfoProto {
required uint64 traceId = 1;
required uint64 parentId = 2;
}
message ClientOperationHeaderProto {
@ -173,6 +179,7 @@ message OpRequestShortCircuitAccessProto {
message ReleaseShortCircuitAccessRequestProto {
required ShortCircuitShmSlotProto slotId = 1;
optional DataTransferTraceInfoProto traceInfo = 2;
}
message ReleaseShortCircuitAccessResponseProto {
@ -184,6 +191,7 @@ message ShortCircuitShmRequestProto {
// The name of the client requesting the shared memory segment. This is
// purely for logging / debugging purposes.
required string clientName = 1;
optional DataTransferTraceInfoProto traceInfo = 2;
}
message ShortCircuitShmResponseProto {

View File

@ -35,17 +35,24 @@
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -599,5 +606,53 @@ public void testSafeModeIBRAfterIncremental() throws Exception {
new BlockListAsLongs(null, null));
assertEquals(1, ds.getBlockReportCount());
}
/**
* Tests that a namenode doesn't choose a datanode with full disks to
* store blocks.
* @throws Exception
*/
@Test
public void testStorageWithRemainingCapacity() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = FileSystem.get(conf);
Path file1 = null;
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().
get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem,
nodeReg);
// By default, MiniDFSCluster will create 1 datanode with 2 storages.
// Assigning 64k for remaining storage capacity and will
//create a file with 100k.
for(DatanodeStorageInfo storage: dd.getStorageInfos()) {
storage.setUtilizationForTesting(65536, 0, 65536, 0);
}
//sum of the remaining capacity of both the storages
dd.setRemaining(131072);
file1 = new Path("testRemainingStorage.dat");
try {
DFSTestUtil.createFile(fs, file1, 102400, 102400, 102400, (short)1,
0x1BAD5EED);
}
catch (RemoteException re) {
GenericTestUtils.assertExceptionContains("nodes instead of "
+ "minReplication", re);
}
}
finally {
// Clean up
assertTrue(fs.exists(file1));
fs.delete(file1, true);
assertTrue(!fs.exists(file1));
cluster.shutdown();
}
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.htrace.HTraceConfiguration;
import org.htrace.Sampler;
import org.htrace.Span;
@ -39,11 +40,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
public class TestTracing {
@ -81,7 +84,12 @@ public void testWriteTraceHooks() throws Exception {
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.create",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.fsync",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete"
"org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.complete",
"DFSOutputStream",
"OpWriteBlockProto",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.addBlock"
};
assertSpanNamesFound(expectedSpanNames);
@ -96,7 +104,7 @@ public void testWriteTraceHooks() throws Exception {
// There should only be one trace id as it should all be homed in the
// top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) {
for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
}
}
@ -152,7 +160,8 @@ public void testReadTraceHooks() throws Exception {
String[] expectedSpanNames = {
"testReadTraceHooks",
"org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations"
"org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol.BlockingInterface.getBlockLocations",
"OpReadBlockProto"
};
assertSpanNamesFound(expectedSpanNames);
@ -168,7 +177,7 @@ public void testReadTraceHooks() throws Exception {
// There should only be one trace id as it should all be homed in the
// top trace.
for (Span span : SetSpanReceiver.SetHolder.spans) {
for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
}
}
@ -228,10 +237,24 @@ public static void shutDown() throws IOException {
cluster.shutdown();
}
private void assertSpanNamesFound(String[] expectedSpanNames) {
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
for (String spanName : expectedSpanNames) {
Assert.assertTrue("Should find a span with name " + spanName, map.get(spanName) != null);
static void assertSpanNamesFound(final String[] expectedSpanNames) {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
for (String spanName : expectedSpanNames) {
if (!map.containsKey(spanName)) {
return false;
}
}
return true;
}
}, 100, 1000);
} catch (TimeoutException e) {
Assert.fail("timed out to get expected spans: " + e.getMessage());
} catch (InterruptedException e) {
Assert.fail("interrupted while waiting spans: " + e.getMessage());
}
}
@ -249,15 +272,16 @@ public void configure(HTraceConfiguration conf) {
}
public void receiveSpan(Span span) {
SetHolder.spans.add(span);
SetHolder.spans.put(span.getSpanId(), span);
}
public void close() {
}
public static class SetHolder {
public static Set<Span> spans = new HashSet<Span>();
public static ConcurrentHashMap<Long, Span> spans =
new ConcurrentHashMap<Long, Span>();
public static int size() {
return spans.size();
}
@ -265,7 +289,7 @@ public static int size() {
public static Map<String, List<Span>> getMap() {
Map<String, List<Span>> map = new HashMap<String, List<Span>>();
for (Span s : spans) {
for (Span s : spans.values()) {
List<Span> l = map.get(s.getDescription());
if (l == null) {
l = new LinkedList<Span>();

View File

@ -0,0 +1,97 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tracing;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
public class TestTracingShortCircuitLocalRead {
private static Configuration conf;
private static MiniDFSCluster cluster;
private static DistributedFileSystem dfs;
private static SpanReceiverHost spanReceiverHost;
private static TemporarySocketDirectory sockDir;
static final Path TEST_PATH = new Path("testShortCircuitTraceHooks");
static final int TEST_LENGTH = 1234;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
@Test
public void testShortCircuitTraceHooks() throws IOException {
conf = new Configuration();
conf.set(SpanReceiverHost.SPAN_RECEIVERS_CONF_KEY,
TestTracing.SetSpanReceiver.class.getName());
conf.setLong("dfs.blocksize", 100 * 1024);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
"testShortCircuitTraceHooks._PORT");
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
dfs = cluster.getFileSystem();
try {
spanReceiverHost = SpanReceiverHost.getInstance(conf);
DFSTestUtil.createFile(dfs, TEST_PATH, TEST_LENGTH, (short)1, 5678L);
TraceScope ts = Trace.startSpan("testShortCircuitTraceHooks", Sampler.ALWAYS);
FSDataInputStream stream = dfs.open(TEST_PATH);
byte buf[] = new byte[TEST_LENGTH];
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
stream.close();
ts.close();
String[] expectedSpanNames = {
"OpRequestShortCircuitAccessProto",
"ShortCircuitShmRequestProto"
};
TestTracing.assertSpanNamesFound(expectedSpanNames);
} finally {
dfs.close();
cluster.shutdown();
}
}
}

View File

@ -849,6 +849,17 @@
<artifactId>xercesImpl</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -21,46 +21,37 @@
public class Constants {
// s3 access key
public static final String OLD_ACCESS_KEY = "fs.s3a.awsAccessKeyId";
public static final String NEW_ACCESS_KEY = "fs.s3a.access.key";
public static final String ACCESS_KEY = "fs.s3a.access.key";
// s3 secret key
public static final String OLD_SECRET_KEY = "fs.s3a.awsSecretAccessKey";
public static final String NEW_SECRET_KEY = "fs.s3a.secret.key";
public static final String SECRET_KEY = "fs.s3a.secret.key";
// number of simultaneous connections to s3
public static final String OLD_MAXIMUM_CONNECTIONS = "fs.s3a.maxConnections";
public static final String NEW_MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final String MAXIMUM_CONNECTIONS = "fs.s3a.connection.maximum";
public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
// connect to s3 over ssl?
public static final String OLD_SECURE_CONNECTIONS = "fs.s3a.secureConnections";
public static final String NEW_SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
// number of times we should retry errors
public static final String OLD_MAX_ERROR_RETRIES = "fs.s3a.maxErrorRetries";
public static final String NEW_MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
public static final String MAX_ERROR_RETRIES = "fs.s3a.attempts.maximum";
public static final int DEFAULT_MAX_ERROR_RETRIES = 10;
// seconds until we give up on a connection to s3
public static final String OLD_SOCKET_TIMEOUT = "fs.s3a.socketTimeout";
public static final String NEW_SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
public static final String SOCKET_TIMEOUT = "fs.s3a.connection.timeout";
public static final int DEFAULT_SOCKET_TIMEOUT = 50000;
// number of records to get while paging through a directory listing
public static final String OLD_MAX_PAGING_KEYS = "fs.s3a.maxPagingKeys";
public static final String NEW_MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final String MAX_PAGING_KEYS = "fs.s3a.paging.maximum";
public static final int DEFAULT_MAX_PAGING_KEYS = 5000;
// size of each of or multipart pieces in bytes
public static final String OLD_MULTIPART_SIZE = "fs.s3a.multipartSize";
public static final String NEW_MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";
public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
// minimum size in bytes before we start a multipart uploads or copy
public static final String OLD_MIN_MULTIPART_THRESHOLD = "fs.s3a.minMultipartSize";
public static final String NEW_MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
public static final int DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
// comma separated list of directories
@ -68,18 +59,15 @@ public class Constants {
// private | public-read | public-read-write | authenticated-read |
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
public static final String OLD_CANNED_ACL = "fs.s3a.cannedACL";
public static final String NEW_CANNED_ACL = "fs.s3a.acl.default";
public static final String CANNED_ACL = "fs.s3a.acl.default";
public static final String DEFAULT_CANNED_ACL = "";
// should we try to purge old multipart uploads when starting up
public static final String OLD_PURGE_EXISTING_MULTIPART = "fs.s3a.purgeExistingMultiPart";
public static final String NEW_PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
// purge any multipart uploads older than this number of seconds
public static final String OLD_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.purgeExistingMultiPartAge";
public static final String NEW_PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
// s3 server-side encryption

View File

@ -95,8 +95,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
this.getWorkingDirectory());
// Try to get our credentials or just connect anonymously
String accessKey = conf.get(NEW_ACCESS_KEY, conf.get(OLD_ACCESS_KEY, null));
String secretKey = conf.get(NEW_SECRET_KEY, conf.get(OLD_SECRET_KEY, null));
String accessKey = conf.get(ACCESS_KEY, null);
String secretKey = conf.get(SECRET_KEY, null);
String userInfo = name.getUserInfo();
if (userInfo != null) {
@ -118,37 +118,33 @@ public void initialize(URI name, Configuration conf) throws IOException {
bucket = name.getHost();
ClientConfiguration awsConf = new ClientConfiguration();
awsConf.setMaxConnections(conf.getInt(NEW_MAXIMUM_CONNECTIONS,
conf.getInt(OLD_MAXIMUM_CONNECTIONS, DEFAULT_MAXIMUM_CONNECTIONS)));
awsConf.setProtocol(conf.getBoolean(NEW_SECURE_CONNECTIONS,
conf.getBoolean(OLD_SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) ?
Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(NEW_MAX_ERROR_RETRIES,
conf.getInt(OLD_MAX_ERROR_RETRIES, DEFAULT_MAX_ERROR_RETRIES)));
awsConf.setSocketTimeout(conf.getInt(NEW_SOCKET_TIMEOUT,
conf.getInt(OLD_SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT)));
awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
DEFAULT_MAXIMUM_CONNECTIONS));
awsConf.setProtocol(conf.getBoolean(SECURE_CONNECTIONS,
DEFAULT_SECURE_CONNECTIONS) ? Protocol.HTTPS : Protocol.HTTP);
awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
DEFAULT_MAX_ERROR_RETRIES));
awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
DEFAULT_SOCKET_TIMEOUT));
s3 = new AmazonS3Client(credentials, awsConf);
maxKeys = conf.getInt(NEW_MAX_PAGING_KEYS,
conf.getInt(OLD_MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS));
partSize = conf.getLong(NEW_MULTIPART_SIZE,
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
if (partSize < 5 * 1024 * 1024) {
LOG.error(NEW_MULTIPART_SIZE + " must be at least 5 MB");
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
}
if (partSizeThreshold < 5 * 1024 * 1024) {
LOG.error(NEW_MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
partSizeThreshold = 5 * 1024 * 1024;
}
String cannedACLName = conf.get(NEW_CANNED_ACL,
conf.get(OLD_CANNED_ACL, DEFAULT_CANNED_ACL));
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) {
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
} else {
@ -159,10 +155,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
throw new IOException("Bucket " + bucket + " does not exist");
}
boolean purgeExistingMultipart = conf.getBoolean(NEW_PURGE_EXISTING_MULTIPART,
conf.getBoolean(OLD_PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART));
long purgeExistingMultipartAge = conf.getLong(NEW_PURGE_EXISTING_MULTIPART_AGE,
conf.getLong(OLD_PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE));
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
if (purgeExistingMultipart) {
TransferManager transferManager = new TransferManager(s3);

View File

@ -75,10 +75,8 @@ public S3AOutputStream(Configuration conf, AmazonS3Client client,
this.statistics = statistics;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = conf.getLong(NEW_MULTIPART_SIZE,
conf.getLong(OLD_MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE));
partSizeThreshold = conf.getInt(NEW_MIN_MULTIPART_THRESHOLD,
conf.getInt(OLD_MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD));
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
if (conf.get(BUFFER_DIR, null) != null) {
lDirAlloc = new LocalDirAllocator(BUFFER_DIR);

View File

@ -84,6 +84,10 @@ Release 2.6.0 - UNRELEASED
failures should be ignored towards counting max-attempts. (Xuan Gong via
vinodkv)
YARN-2531. Added a configuration for admins to be able to override app-configs
and enforce/not-enforce strict control of per-container cpu usage. (Varun
Vasudev via vinodkv)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@ -225,6 +229,9 @@ Release 2.6.0 - UNRELEASED
YARN-2547. Cross Origin Filter throws UnsupportedOperationException upon
destroy (Mit Desai via jeagles)
YARN-2557. Add a parameter "attempt_Failures_Validity_Interval" into
DistributedShell (xgong)
OPTIMIZATIONS
BUG FIXES

View File

@ -902,6 +902,16 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH =
NM_PREFIX + "linux-container-executor.cgroups.mount-path";
/**
* Whether the apps should run in strict resource usage mode(not allowed to
* use spare CPU)
*/
public static final String NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
NM_PREFIX + "linux-container-executor.cgroups.strict-resource-usage";
public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
false;
/**
* Interval of time the linux container executor should try cleaning up

View File

@ -75,7 +75,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Client for Distributed Shell application submission to YARN.
@ -163,6 +162,8 @@ public class Client {
// flag to indicate whether to keep containers across application attempts.
private boolean keepContainers = false;
private long attemptFailuresValidityInterval = -1;
// Debug flag
boolean debugFlag = false;
@ -248,6 +249,12 @@ public Client(Configuration conf) throws Exception {
" If the flag is true, running containers will not be killed when" +
" application attempt fails and these containers will be retrieved by" +
" the new application attempt ");
opts.addOption("attempt_failures_validity_interval", true,
"when attempt_failures_validity_interval in milliseconds is set to > 0," +
"the failure number will not take failures which happen out of " +
"the validityInterval into failure count. " +
"If failure count reaches to maxAppAttempts, " +
"the application will be failed.");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@ -372,6 +379,10 @@ public boolean init(String[] args) throws ParseException {
clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
attemptFailuresValidityInterval =
Long.parseLong(cliParser.getOptionValue(
"attempt_failures_validity_interval", "-1"));
log4jPropFile = cliParser.getOptionValue("log_properties", "");
return true;
@ -456,6 +467,11 @@ public boolean run() throws IOException, YarnException {
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
if (attemptFailuresValidityInterval >= 0) {
appContext
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.applications.distributedshell;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class TestDSSleepingAppMaster extends ApplicationMaster{
private static final Log LOG = LogFactory.getLog(TestDSSleepingAppMaster.class);
private static final long SLEEP_TIME = 5000;
public static void main(String[] args) {
boolean result = false;
try {
TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
appMaster.run();
if (appMaster.appAttemptID.getAttemptId() <= 2) {
try {
// sleep some time
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {}
// fail the first am.
System.exit(100);
}
result = appMaster.finish();
} catch (Throwable t) {
System.exit(1);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(2);
}
}
}

View File

@ -308,6 +308,82 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception {
Assert.assertTrue(result);
}
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
* Set attempt_failures_validity_interval as 2.5 seconds. It will check
* how many attempt failures for previous 2.5 seconds.
* The application is expected to be successful.
*/
@Test(timeout=90000)
public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
"sleep 8",
"--master_memory",
"512",
"--container_memory",
"128",
"--attempt_failures_validity_interval",
"2500"
};
LOG.info("Initializing DS Client");
Configuration conf = yarnCluster.getConfig();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
Client client = new Client(TestDSSleepingAppMaster.class.getName(),
new Configuration(conf));
client.init(args);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
// application should succeed
Assert.assertTrue(result);
}
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
* Set attempt_failures_validity_interval as 15 seconds. It will check
* how many attempt failure for previous 15 seconds.
* The application is expected to be fail.
*/
@Test(timeout=90000)
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
String[] args = {
"--jar",
APPMASTER_JAR,
"--num_containers",
"1",
"--shell_command",
"sleep 8",
"--master_memory",
"512",
"--container_memory",
"128",
"--attempt_failures_validity_interval",
"15000"
};
LOG.info("Initializing DS Client");
Configuration conf = yarnCluster.getConfig();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
Client client = new Client(TestDSSleepingAppMaster.class.getName(),
new Configuration(conf));
client.init(args);
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
// application should be failed
Assert.assertFalse(result);
}
@Test(timeout=90000)
public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir =

View File

@ -1039,6 +1039,16 @@
<value>^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$</value>
</property>
<property>
<description>This flag determines whether apps should run with strict resource limits
or be allowed to consume spare resources if they need them. For example, turning the
flag on will restrict apps to use only their share of CPU, even if the node has spare
CPU cycles. The default value is false i.e. use available resources. Please note that
turning this flag on may reduce job throughput on the cluster.</description>
<name>yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage</name>
<value>false</value>
</property>
<property>
<description>T-file compression types used to compress aggregated logs.</description>
<name>yarn.nodemanager.log-aggregation.compression-type</name>

View File

@ -57,6 +57,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private String cgroupMountPath;
private boolean cpuWeightEnabled = true;
private boolean strictResourceUsageMode = false;
private final String MTAB_FILE = "/proc/mounts";
private final String CGROUPS_FSTYPE = "cgroup";
@ -71,6 +72,8 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
private long deleteCgroupTimeout;
// package private for testing purposes
Clock clock;
private float yarnProcessors;
public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>();
@ -105,6 +108,12 @@ void initConfig() throws IOException {
cgroupPrefix = cgroupPrefix.substring(1);
}
this.strictResourceUsageMode =
conf
.getBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE,
YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE);
int len = cgroupPrefix.length();
if (cgroupPrefix.charAt(len - 1) == '/') {
cgroupPrefix = cgroupPrefix.substring(0, len - 1);
@ -132,8 +141,7 @@ void init(LinuxContainerExecutor lce, ResourceCalculatorPlugin plugin)
initializeControllerPaths();
// cap overall usage to the number of cores allocated to YARN
float yarnProcessors =
NodeManagerHardwareUtils.getContainersCores(plugin, conf);
yarnProcessors = NodeManagerHardwareUtils.getContainersCores(plugin, conf);
int systemProcessors = plugin.getNumProcessors();
if (systemProcessors != (int) yarnProcessors) {
LOG.info("YARN containers restricted to " + yarnProcessors + " cores");
@ -290,10 +298,25 @@ private void setupLimits(ContainerId containerId,
String containerName = containerId.toString();
if (isCpuWeightEnabled()) {
int containerVCores = containerResource.getVirtualCores();
createCgroup(CONTROLLER_CPU, containerName);
int cpuShares = CPU_DEFAULT_WEIGHT * containerResource.getVirtualCores();
int cpuShares = CPU_DEFAULT_WEIGHT * containerVCores;
updateCgroup(CONTROLLER_CPU, containerName, "shares",
String.valueOf(cpuShares));
if (strictResourceUsageMode) {
int nodeVCores =
conf.getInt(YarnConfiguration.NM_VCORES,
YarnConfiguration.DEFAULT_NM_VCORES);
if (nodeVCores != containerVCores) {
float containerCPU =
(containerVCores * yarnProcessors) / (float) nodeVCores;
int[] limits = getOverallLimits(containerCPU);
updateCgroup(CONTROLLER_CPU, containerName, CPU_PERIOD_US,
String.valueOf(limits[0]));
updateCgroup(CONTROLLER_CPU, containerName, CPU_QUOTA_US,
String.valueOf(limits[1]));
}
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager.util;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
import org.junit.Assert;
@ -86,9 +88,13 @@ static class CustomCgroupsLCEResourceHandler extends
String mtabFile;
int[] limits = new int[2];
boolean generateLimitsMode = false;
@Override
int[] getOverallLimits(float x) {
if (generateLimitsMode == true) {
return super.getOverallLimits(x);
}
return limits;
}
@ -116,32 +122,11 @@ public void testInit() throws IOException {
handler.initConfig();
// create mock cgroup
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
File cgroupDir = createMockCgroup();
File cgroupMountDir = createMockCgroupMount(cgroupDir);
// create mock mtab
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
File mockMtab = createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
@ -156,7 +141,8 @@ public void testInit() throws IOException {
Assert.assertFalse(quotaFile.exists());
// subset of cpu being used, files should be created
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 75);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
@ -166,7 +152,8 @@ public void testInit() throws IOException {
Assert.assertEquals(1000 * 1000, quota);
// set cpu back to 100, quota should be -1
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 100);
conf.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
100);
handler.limits[0] = 100 * 1000;
handler.limits[1] = 1000 * 1000;
handler.init(mockLCE, plugin);
@ -213,4 +200,130 @@ public void testGetOverallLimits() {
Assert.assertEquals(1000 * 1000, ret[0]);
Assert.assertEquals(-1, ret[1]);
}
private File createMockCgroup() throws IOException {
File cgroupDir = new File("target", UUID.randomUUID().toString());
if (!cgroupDir.mkdir()) {
String message = "Could not create dir " + cgroupDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupDir;
}
private File createMockCgroupMount(File cgroupDir) throws IOException {
File cgroupMountDir = new File(cgroupDir.getAbsolutePath(), "hadoop-yarn");
if (!cgroupMountDir.mkdir()) {
String message =
"Could not create dir " + cgroupMountDir.getAbsolutePath();
throw new IOException(message);
}
return cgroupMountDir;
}
private File createMockMTab(File cgroupDir) throws IOException {
String mtabContent =
"none " + cgroupDir.getAbsolutePath() + " cgroup rw,relatime,cpu 0 0";
File mockMtab = new File("target", UUID.randomUUID().toString());
if (!mockMtab.exists()) {
if (!mockMtab.createNewFile()) {
String message = "Could not create file " + mockMtab.getAbsolutePath();
throw new IOException(message);
}
}
FileWriter mtabWriter = new FileWriter(mockMtab.getAbsoluteFile());
mtabWriter.write(mtabContent);
mtabWriter.close();
mockMtab.deleteOnExit();
return mockMtab;
}
@Test
public void testContainerLimits() throws IOException {
LinuxContainerExecutor mockLCE = new MockLinuxContainerExecutor();
CustomCgroupsLCEResourceHandler handler =
new CustomCgroupsLCEResourceHandler();
handler.generateLimitsMode = true;
YarnConfiguration conf = new YarnConfiguration();
final int numProcessors = 4;
ResourceCalculatorPlugin plugin =
Mockito.mock(ResourceCalculatorPlugin.class);
Mockito.doReturn(numProcessors).when(plugin).getNumProcessors();
handler.setConf(conf);
handler.initConfig();
// create mock cgroup
File cgroupDir = createMockCgroup();
File cgroupMountDir = createMockCgroupMount(cgroupDir);
// create mock mtab
File mockMtab = createMockMTab(cgroupDir);
// setup our handler and call init()
handler.setMtabFile(mockMtab.getAbsolutePath());
handler.init(mockLCE, plugin);
// check values
// default case - files shouldn't exist, strict mode off by default
ContainerId id = ContainerId.fromString("container_1_1_1_1");
handler.preExecute(id, Resource.newInstance(1024, 1));
File containerDir = new File(cgroupMountDir, id.toString());
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
File periodFile = new File(containerDir, "cpu.cfs_period_us");
File quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// no files created because we're using all cpu
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertFalse(periodFile.exists());
Assert.assertFalse(quotaFile.exists());
// 50% of CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
handler.initConfig();
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(500 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
// CGroups set to 50% of CPU, container set to 50% of YARN CPU
FileUtils.deleteQuietly(containerDir);
conf.setBoolean(
YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE, true);
conf
.setInt(YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT, 50);
handler.initConfig();
handler.init(mockLCE, plugin);
handler.preExecute(id,
Resource.newInstance(1024, YarnConfiguration.DEFAULT_NM_VCORES / 2));
Assert.assertTrue(containerDir.exists());
Assert.assertTrue(containerDir.isDirectory());
periodFile = new File(containerDir, "cpu.cfs_period_us");
quotaFile = new File(containerDir, "cpu.cfs_quota_us");
Assert.assertTrue(periodFile.exists());
Assert.assertTrue(quotaFile.exists());
Assert.assertEquals(1000 * 1000, readIntFromFile(periodFile));
Assert.assertEquals(1000 * 1000, readIntFromFile(quotaFile));
FileUtils.deleteQuietly(cgroupDir);
}
}