diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1f01fd8c34..18758515b4 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -396,6 +396,12 @@ Trunk (Unreleased) HADOOP-10840. Fix OutOfMemoryError caused by metrics system in Azure File System. (Shanyu Zhao via cnauroth) + HADOOP-10826. Iteration on KeyProviderFactory.serviceLoader is + thread-unsafe. (benoyantony viat tucu) + + HADOOP-10881. Clarify usage of encryption and encrypted encryption + key in KeyProviderCryptoExtension. (wang) + OPTIMIZATIONS HADOOP-7761. Improve the performance of raw comparisons. (todd) @@ -444,6 +450,8 @@ Release 2.6.0 - UNRELEASED HADOOP-10755. Support negative caching of user-group mapping. (Lei Xu via wang) + HADOOP-10855. Allow Text to be read with a known Length. (todd) + OPTIMIZATIONS BUG FIXES @@ -794,6 +802,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10710. hadoop.auth cookie is not properly constructed according to RFC2109. (Juan Yu via tucu) + HADOOP-10864. Tool documentenation is broken. (Akira Ajisaka + via Arpit Agarwal) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java index e4b822d2c6..0ba73f1519 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.security.SecureRandom; - import javax.crypto.Cipher; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; @@ -30,51 +29,109 @@ import org.apache.hadoop.classification.InterfaceAudience; /** - * A KeyProvider with Cytographic Extensions specifically for generating - * Encrypted Keys as well as decrypting them + * A KeyProvider with Cryptographic Extensions specifically for generating + * and decrypting encrypted encryption keys. * */ @InterfaceAudience.Private public class KeyProviderCryptoExtension extends KeyProviderExtension { + /** + * Designates an encrypted encryption key, or EEK. + */ public static final String EEK = "EEK"; + /** + * Designates a decrypted encrypted encryption key, that is, an encryption key + * (EK). + */ public static final String EK = "EK"; /** - * This is a holder class whose instance contains the keyVersionName, iv - * used to generate the encrypted Key and the encrypted KeyVersion + * An encrypted encryption key (EEK) and related information. An EEK must be + * decrypted using the key's encryption key before it can be used. */ public static class EncryptedKeyVersion { - private String keyName; - private String keyVersionName; - private byte[] iv; - private KeyVersion encryptedKey; + private String encryptionKeyName; + private String encryptionKeyVersionName; + private byte[] encryptedKeyIv; + private KeyVersion encryptedKeyVersion; - protected EncryptedKeyVersion(String keyName, String keyVersionName, - byte[] iv, KeyVersion encryptedKey) { - this.keyName = keyName; - this.keyVersionName = keyVersionName; - this.iv = iv; - this.encryptedKey = encryptedKey; + /** + * Create a new EncryptedKeyVersion. + * + * @param keyName Name of the encryption key used to + * encrypt the encrypted key. + * @param encryptionKeyVersionName Version name of the encryption key used + * to encrypt the encrypted key. + * @param encryptedKeyIv Initialization vector of the encrypted + * key. The IV of the encryption key used to + * encrypt the encrypted key is derived from + * this IV. + * @param encryptedKeyVersion The encrypted encryption key version. + */ + protected EncryptedKeyVersion(String keyName, + String encryptionKeyVersionName, byte[] encryptedKeyIv, + KeyVersion encryptedKeyVersion) { + this.encryptionKeyName = keyName; + this.encryptionKeyVersionName = encryptionKeyVersionName; + this.encryptedKeyIv = encryptedKeyIv; + this.encryptedKeyVersion = encryptedKeyVersion; } - public String getKeyName() { - return keyName; + /** + * @return Name of the encryption key used to encrypt the encrypted key. + */ + public String getEncryptionKeyName() { + return encryptionKeyName; } - public String getKeyVersionName() { - return keyVersionName; + /** + * @return Version name of the encryption key used to encrypt the encrypted + * key. + */ + public String getEncryptionKeyVersionName() { + return encryptionKeyVersionName; } - public byte[] getIv() { - return iv; + /** + * @return Initialization vector of the encrypted key. The IV of the + * encryption key used to encrypt the encrypted key is derived from this + * IV. + */ + public byte[] getEncryptedKeyIv() { + return encryptedKeyIv; } - public KeyVersion getEncryptedKey() { - return encryptedKey; + /** + * @return The encrypted encryption key version. + */ + public KeyVersion getEncryptedKeyVersion() { + return encryptedKeyVersion; } + /** + * Derive the initialization vector (IV) for the encryption key from the IV + * of the encrypted key. This derived IV is used with the encryption key to + * decrypt the encrypted key. + *

+ * The alternative to this is using the same IV for both the encryption key + * and the encrypted key. Even a simple symmetric transformation like this + * improves security by avoiding IV re-use. IVs will also be fairly unique + * among different EEKs. + * + * @param encryptedKeyIV of the encrypted key (i.e. {@link + * #getEncryptedKeyIv()}) + * @return IV for the encryption key + */ + protected static byte[] deriveIV(byte[] encryptedKeyIV) { + byte[] rIv = new byte[encryptedKeyIV.length]; + // Do a simple XOR transformation to flip all the bits + for (int i = 0; i < encryptedKeyIV.length; i++) { + rIv[i] = (byte) (encryptedKeyIV[i] ^ 0xff); + } + return rIv; + } } /** @@ -141,53 +198,56 @@ private DefaultCryptoExtension(KeyProvider keyProvider) { this.keyProvider = keyProvider; } - // the IV used to encrypt a EK typically will be the same IV used to - // encrypt data with the EK. To avoid any chance of weakening the - // encryption because the same IV is used, we simply XOR the IV thus we - // are not using the same IV for 2 different encryptions (even if they - // are done using different keys) - private byte[] flipIV(byte[] iv) { - byte[] rIv = new byte[iv.length]; - for (int i = 0; i < iv.length; i++) { - rIv[i] = (byte) (iv[i] ^ 0xff); - } - return rIv; - } - @Override public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName) throws IOException, GeneralSecurityException { - KeyVersion keyVer = keyProvider.getCurrentKey(encryptionKeyName); - Preconditions.checkNotNull(keyVer, "No KeyVersion exists for key '%s' ", - encryptionKeyName); - byte[] newKey = new byte[keyVer.getMaterial().length]; - SecureRandom.getInstance("SHA1PRNG").nextBytes(newKey); + // Fetch the encryption key + KeyVersion encryptionKey = keyProvider.getCurrentKey(encryptionKeyName); + Preconditions.checkNotNull(encryptionKey, + "No KeyVersion exists for key '%s' ", encryptionKeyName); + // Generate random bytes for new key and IV Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); - byte[] iv = SecureRandom.getSeed(cipher.getBlockSize()); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(keyVer.getMaterial(), - "AES"), new IvParameterSpec(flipIV(iv))); - byte[] ek = cipher.doFinal(newKey); + SecureRandom random = SecureRandom.getInstance("SHA1PRNG"); + final byte[] newKey = new byte[encryptionKey.getMaterial().length]; + random.nextBytes(newKey); + final byte[] iv = random.generateSeed(cipher.getBlockSize()); + // Encryption key IV is derived from new key's IV + final byte[] encryptionIV = EncryptedKeyVersion.deriveIV(iv); + // Encrypt the new key + cipher.init(Cipher.ENCRYPT_MODE, + new SecretKeySpec(encryptionKey.getMaterial(), "AES"), + new IvParameterSpec(encryptionIV)); + final byte[] encryptedKey = cipher.doFinal(newKey); return new EncryptedKeyVersion(encryptionKeyName, - keyVer.getVersionName(), iv, - new KeyVersion(keyVer.getName(), EEK, ek)); + encryptionKey.getVersionName(), iv, + new KeyVersion(encryptionKey.getName(), EEK, encryptedKey)); } @Override public KeyVersion decryptEncryptedKey( EncryptedKeyVersion encryptedKeyVersion) throws IOException, GeneralSecurityException { - KeyVersion keyVer = - keyProvider.getKeyVersion(encryptedKeyVersion.getKeyVersionName()); - Preconditions.checkNotNull(keyVer, "KeyVersion name '%s' does not exist", - encryptedKeyVersion.getKeyVersionName()); - KeyVersion keyVersion = encryptedKeyVersion.getEncryptedKey(); + // Fetch the encryption key material + final String encryptionKeyVersionName = + encryptedKeyVersion.getEncryptionKeyVersionName(); + final KeyVersion encryptionKey = + keyProvider.getKeyVersion(encryptionKeyVersionName); + Preconditions.checkNotNull(encryptionKey, + "KeyVersion name '%s' does not exist", encryptionKeyVersionName); + final byte[] encryptionKeyMaterial = encryptionKey.getMaterial(); + // Encryption key IV is determined from encrypted key's IV + final byte[] encryptionIV = + EncryptedKeyVersion.deriveIV(encryptedKeyVersion.getEncryptedKeyIv()); + // Init the cipher with encryption key parameters Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); cipher.init(Cipher.DECRYPT_MODE, - new SecretKeySpec(keyVersion.getMaterial(), "AES"), - new IvParameterSpec(flipIV(encryptedKeyVersion.getIv()))); - byte[] ek = - cipher.doFinal(encryptedKeyVersion.getEncryptedKey().getMaterial()); - return new KeyVersion(keyVer.getName(), EK, ek); + new SecretKeySpec(encryptionKeyMaterial, "AES"), + new IvParameterSpec(encryptionIV)); + // Decrypt the encrypted key + final KeyVersion encryptedKV = + encryptedKeyVersion.getEncryptedKeyVersion(); + final byte[] decryptedKey = cipher.doFinal(encryptedKV.getMaterial()); + return new KeyVersion(encryptionKey.getName(), EK, decryptedKey); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java index 05890dc8f5..9855bc8ed2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderFactory.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; @@ -47,6 +48,15 @@ public abstract KeyProvider createProvider(URI providerName, private static final ServiceLoader serviceLoader = ServiceLoader.load(KeyProviderFactory.class); + // Iterate through the serviceLoader to avoid lazy loading. + // Lazy loading would require synchronization in concurrent use cases. + static { + Iterator iterServices = serviceLoader.iterator(); + while (iterServices.hasNext()) { + iterServices.next(); + } + } + public static List getProviders(Configuration conf ) throws IOException { List result = new ArrayList(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 808b1bb102..06521a4359 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -646,25 +646,28 @@ public EncryptedKeyVersion generateEncryptedKey( public KeyVersion decryptEncryptedKey( EncryptedKeyVersion encryptedKeyVersion) throws IOException, GeneralSecurityException { - checkNotNull(encryptedKeyVersion.getKeyVersionName(), "versionName"); - checkNotNull(encryptedKeyVersion.getIv(), "iv"); - Preconditions.checkArgument(encryptedKeyVersion.getEncryptedKey() - .getVersionName().equals(KeyProviderCryptoExtension.EEK), + checkNotNull(encryptedKeyVersion.getEncryptionKeyVersionName(), + "versionName"); + checkNotNull(encryptedKeyVersion.getEncryptedKeyIv(), "iv"); + Preconditions.checkArgument( + encryptedKeyVersion.getEncryptedKeyVersion().getVersionName() + .equals(KeyProviderCryptoExtension.EEK), "encryptedKey version name must be '%s', is '%s'", - KeyProviderCryptoExtension.EK, encryptedKeyVersion.getEncryptedKey() - .getVersionName()); - checkNotNull(encryptedKeyVersion.getEncryptedKey(), "encryptedKey"); + KeyProviderCryptoExtension.EK, + encryptedKeyVersion.getEncryptedKeyVersion().getVersionName() + ); + checkNotNull(encryptedKeyVersion.getEncryptedKeyVersion(), "encryptedKey"); Map params = new HashMap(); params.put(KMSRESTConstants.EEK_OP, KMSRESTConstants.EEK_DECRYPT); Map jsonPayload = new HashMap(); jsonPayload.put(KMSRESTConstants.NAME_FIELD, - encryptedKeyVersion.getKeyName()); + encryptedKeyVersion.getEncryptionKeyName()); jsonPayload.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64String( - encryptedKeyVersion.getIv())); + encryptedKeyVersion.getEncryptedKeyIv())); jsonPayload.put(KMSRESTConstants.MATERIAL_FIELD, Base64.encodeBase64String( - encryptedKeyVersion.getEncryptedKey().getMaterial())); + encryptedKeyVersion.getEncryptedKeyVersion().getMaterial())); URL url = createURL(KMSRESTConstants.KEY_VERSION_RESOURCE, - encryptedKeyVersion.getKeyVersionName(), + encryptedKeyVersion.getEncryptionKeyVersionName(), KMSRESTConstants.EEK_SUB_RESOURCE, params); HttpURLConnection conn = createConnection(url, HTTP_POST); conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON_MIME); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java index e4490f1e34..3dc507687f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Text.java @@ -288,9 +288,7 @@ public String toString() { @Override public void readFields(DataInput in) throws IOException { int newLength = WritableUtils.readVInt(in); - setCapacity(newLength, false); - in.readFully(bytes, 0, newLength); - length = newLength; + readWithKnownLength(in, newLength); } public void readFields(DataInput in, int maxLength) throws IOException { @@ -302,9 +300,7 @@ public void readFields(DataInput in, int maxLength) throws IOException { throw new IOException("tried to deserialize " + newLength + " bytes of data, but maxLength = " + maxLength); } - setCapacity(newLength, false); - in.readFully(bytes, 0, newLength); - length = newLength; + readWithKnownLength(in, newLength); } /** Skips over one Text in the input. */ @@ -313,6 +309,17 @@ public static void skip(DataInput in) throws IOException { WritableUtils.skipFully(in, length); } + /** + * Read a Text object whose length is already known. + * This allows creating Text from a stream which uses a different serialization + * format. + */ + public void readWithKnownLength(DataInput in, int len) throws IOException { + setCapacity(len, false); + in.readFully(bytes, 0, len); + length = len; + } + /** serialize * write this object to out * length uses zero-compressed encoding diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index 2cfa243895..6ee6db769a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -883,8 +883,8 @@ protected int getWeight(Node reader, Node node) { * @param seed Used to seed the pseudo-random generator that randomizes the * set of nodes at each network distance. */ - public void sortByDistance(Node reader, Node[] nodes, - int activeLen, long seed) { + public void sortByDistance(Node reader, Node[] nodes, int activeLen, + long seed, boolean randomizeBlockLocationsPerBlock) { /** Sort weights for the nodes array */ int[] weights = new int[activeLen]; for (int i=0; i list: tree.values()) { if (list != null) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 7243f72876..86d290abd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -279,8 +279,8 @@ protected int getWeight(Node reader, Node node) { * set of nodes at each network distance. */ @Override - public void sortByDistance( Node reader, Node[] nodes, - int activeLen, long seed) { + public void sortByDistance(Node reader, Node[] nodes, int activeLen, + long seed, boolean randomizeBlockLocationsPerBlock) { // If reader is not a datanode (not in NetworkTopology tree), we need to // replace this reader with a sibling leaf node in tree. if (reader != null && !this.contains(reader)) { @@ -293,7 +293,8 @@ public void sortByDistance( Node reader, Node[] nodes, return; } } - super.sortByDistance(reader, nodes, nodes.length, seed); + super.sortByDistance(reader, nodes, nodes.length, seed, + randomizeBlockLocationsPerBlock); } /** InnerNodeWithNodeGroup represents a switch/router of a data center, rack diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Tool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Tool.java index ce5b1fd4b9..3ff8e522de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Tool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Tool.java @@ -27,7 +27,7 @@ * *

Tool, is the standard for any Map-Reduce tool/application. * The tool/application should delegate the handling of - * + * * standard command-line options to {@link ToolRunner#run(Tool, String[])} * and only handle its custom arguments.

* diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java index 4b578c2377..a0da98b4fc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderCryptoExtension.java @@ -17,51 +17,112 @@ */ package org.apache.hadoop.crypto.key; -import org.apache.hadoop.conf.Configuration; -import org.junit.Assert; -import org.junit.Test; - import java.net.URI; import java.security.SecureRandom; +import java.util.Arrays; + +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.hadoop.conf.Configuration; +import org.junit.BeforeClass; +import org.junit.Test; + + +import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; public class TestKeyProviderCryptoExtension { private static final String CIPHER = "AES"; + private static final String ENCRYPTION_KEY_NAME = "fooKey"; + + private static Configuration conf; + private static KeyProvider kp; + private static KeyProviderCryptoExtension kpExt; + private static KeyProvider.Options options; + private static KeyVersion encryptionKey; + + @BeforeClass + public static void setup() throws Exception { + conf = new Configuration(); + kp = new UserProvider.Factory().createProvider(new URI("user:///"), conf); + kpExt = KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp); + options = new KeyProvider.Options(conf); + options.setCipher(CIPHER); + options.setBitLength(128); + encryptionKey = + kp.createKey(ENCRYPTION_KEY_NAME, SecureRandom.getSeed(16), options); + } @Test public void testGenerateEncryptedKey() throws Exception { - Configuration conf = new Configuration(); - KeyProvider kp = - new UserProvider.Factory().createProvider(new URI("user:///"), conf); - KeyProvider.Options options = new KeyProvider.Options(conf); - options.setCipher(CIPHER); - options.setBitLength(128); - KeyProvider.KeyVersion kv = kp.createKey("foo", SecureRandom.getSeed(16), - options); - KeyProviderCryptoExtension kpExt = - KeyProviderCryptoExtension.createKeyProviderCryptoExtension(kp); - + // Generate a new EEK and check it KeyProviderCryptoExtension.EncryptedKeyVersion ek1 = - kpExt.generateEncryptedKey(kv.getName()); - Assert.assertEquals(KeyProviderCryptoExtension.EEK, - ek1.getEncryptedKey().getVersionName()); - Assert.assertEquals("foo", ek1.getKeyName()); - Assert.assertNotNull(ek1.getEncryptedKey().getMaterial()); - Assert.assertEquals(kv.getMaterial().length, - ek1.getEncryptedKey().getMaterial().length); - KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1); - Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName()); - KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1); - Assert.assertArrayEquals(k1.getMaterial(), k1a.getMaterial()); - Assert.assertEquals(kv.getMaterial().length, k1.getMaterial().length); + kpExt.generateEncryptedKey(encryptionKey.getName()); + assertEquals("Version name of EEK should be EEK", + KeyProviderCryptoExtension.EEK, + ek1.getEncryptedKeyVersion().getVersionName()); + assertEquals("Name of EEK should be encryption key name", + ENCRYPTION_KEY_NAME, ek1.getEncryptionKeyName()); + assertNotNull("Expected encrypted key material", + ek1.getEncryptedKeyVersion().getMaterial()); + assertEquals("Length of encryption key material and EEK material should " + + "be the same", encryptionKey.getMaterial().length, + ek1.getEncryptedKeyVersion().getMaterial().length + ); - KeyProviderCryptoExtension.EncryptedKeyVersion ek2 = - kpExt.generateEncryptedKey(kv.getName()); - KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2); - boolean eq = true; - for (int i = 0; eq && i < ek2.getEncryptedKey().getMaterial().length; i++) { - eq = k2.getMaterial()[i] == k1.getMaterial()[i]; + // Decrypt EEK into an EK and check it + KeyVersion k1 = kpExt.decryptEncryptedKey(ek1); + assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName()); + assertEquals(encryptionKey.getMaterial().length, k1.getMaterial().length); + if (Arrays.equals(k1.getMaterial(), encryptionKey.getMaterial())) { + fail("Encrypted key material should not equal encryption key material"); } - Assert.assertFalse(eq); + if (Arrays.equals(ek1.getEncryptedKeyVersion().getMaterial(), + encryptionKey.getMaterial())) { + fail("Encrypted key material should not equal decrypted key material"); + } + // Decrypt it again and it should be the same + KeyVersion k1a = kpExt.decryptEncryptedKey(ek1); + assertArrayEquals(k1.getMaterial(), k1a.getMaterial()); + + // Generate another EEK and make sure it's different from the first + KeyProviderCryptoExtension.EncryptedKeyVersion ek2 = + kpExt.generateEncryptedKey(encryptionKey.getName()); + KeyVersion k2 = kpExt.decryptEncryptedKey(ek2); + if (Arrays.equals(k1.getMaterial(), k2.getMaterial())) { + fail("Generated EEKs should have different material!"); + } + if (Arrays.equals(ek1.getEncryptedKeyIv(), ek2.getEncryptedKeyIv())) { + fail("Generated EEKs should have different IVs!"); + } + } + + @Test + public void testEncryptDecrypt() throws Exception { + // Get an EEK + KeyProviderCryptoExtension.EncryptedKeyVersion eek = + kpExt.generateEncryptedKey(encryptionKey.getName()); + final byte[] encryptedKeyIv = eek.getEncryptedKeyIv(); + final byte[] encryptedKeyMaterial = eek.getEncryptedKeyVersion() + .getMaterial(); + // Decrypt it manually + Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding"); + cipher.init(Cipher.DECRYPT_MODE, + new SecretKeySpec(encryptionKey.getMaterial(), "AES"), + new IvParameterSpec(KeyProviderCryptoExtension.EncryptedKeyVersion + .deriveIV(encryptedKeyIv))); + final byte[] manualMaterial = cipher.doFinal(encryptedKeyMaterial); + // Decrypt it with the API + KeyVersion decryptedKey = kpExt.decryptEncryptedKey(eek); + final byte[] apiMaterial = decryptedKey.getMaterial(); + + assertArrayEquals("Wrong key material from decryptEncryptedKey", + manualMaterial, apiMaterial); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java index 4b04931f70..56b199a422 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestText.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.Random; +import com.google.common.base.Charsets; import com.google.common.primitives.Bytes; /** Unit tests for LargeUTF8. */ @@ -363,6 +364,27 @@ public void testReadWriteOperations() { fail("testReadWriteOperations error !!!"); } } + + public void testReadWithKnownLength() throws IOException { + String line = "hello world"; + byte[] inputBytes = line.getBytes(Charsets.UTF_8); + DataInputBuffer in = new DataInputBuffer(); + Text text = new Text(); + + in.reset(inputBytes, inputBytes.length); + text.readWithKnownLength(in, 5); + assertEquals("hello", text.toString()); + + // Read longer length, make sure it lengthens + in.reset(inputBytes, inputBytes.length); + text.readWithKnownLength(in, 7); + assertEquals("hello w", text.toString()); + + // Read shorter length, make sure it shortens + in.reset(inputBytes, inputBytes.length); + text.readWithKnownLength(in, 2); + assertEquals("he", text.toString()); + } /** * test {@code Text.bytesToCodePoint(bytes) } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java index ca61c1e86b..657fae3f52 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java @@ -105,7 +105,7 @@ public void testSortByDistance() throws Exception { testNodes[2] = dataNodes[3]; testNodes[3] = dataNodes[0]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[2]); @@ -117,7 +117,7 @@ public void testSortByDistance() throws Exception { testNodes[2] = dataNodes[1]; testNodes[3] = dataNodes[0]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); @@ -127,7 +127,7 @@ public void testSortByDistance() throws Exception { testNodes[2] = dataNodes[2]; testNodes[3] = dataNodes[0]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[2]); @@ -137,7 +137,7 @@ public void testSortByDistance() throws Exception { testNodes[2] = dataNodes[2]; testNodes[3] = dataNodes[0]; cluster.sortByDistance(computeNode, testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[2]); } diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java index aafb7046fc..24af81be23 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/KMSServerJSONUtils.java @@ -64,12 +64,12 @@ public static Map toJSON(EncryptedKeyVersion encryptedKeyVersion) { Map json = new LinkedHashMap(); if (encryptedKeyVersion != null) { json.put(KMSRESTConstants.VERSION_NAME_FIELD, - encryptedKeyVersion.getKeyVersionName()); + encryptedKeyVersion.getEncryptionKeyVersionName()); json.put(KMSRESTConstants.IV_FIELD, Base64.encodeBase64URLSafeString( - encryptedKeyVersion.getIv())); + encryptedKeyVersion.getEncryptedKeyIv())); json.put(KMSRESTConstants.ENCRYPTED_KEY_VERSION_FIELD, - toJSON(encryptedKeyVersion.getEncryptedKey())); + toJSON(encryptedKeyVersion.getEncryptedKeyVersion())); } return json; } diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java index 26b334df45..021f3cb053 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java +++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java @@ -485,10 +485,10 @@ public Void call() throws Exception { EncryptedKeyVersion ek1 = kpExt.generateEncryptedKey(kv.getName()); Assert.assertEquals(KeyProviderCryptoExtension.EEK, - ek1.getEncryptedKey().getVersionName()); - Assert.assertNotNull(ek1.getEncryptedKey().getMaterial()); + ek1.getEncryptedKeyVersion().getVersionName()); + Assert.assertNotNull(ek1.getEncryptedKeyVersion().getMaterial()); Assert.assertEquals(kv.getMaterial().length, - ek1.getEncryptedKey().getMaterial().length); + ek1.getEncryptedKeyVersion().getMaterial().length); KeyProvider.KeyVersion k1 = kpExt.decryptEncryptedKey(ek1); Assert.assertEquals(KeyProviderCryptoExtension.EK, k1.getVersionName()); KeyProvider.KeyVersion k1a = kpExt.decryptEncryptedKey(ek1); @@ -498,8 +498,8 @@ public Void call() throws Exception { EncryptedKeyVersion ek2 = kpExt.generateEncryptedKey(kv.getName()); KeyProvider.KeyVersion k2 = kpExt.decryptEncryptedKey(ek2); boolean isEq = true; - for (int i = 0; isEq && i < ek2.getEncryptedKey().getMaterial().length; - i++) { + for (int i = 0; isEq && i < ek2.getEncryptedKeyVersion() + .getMaterial().length; i++) { isEq = k2.getMaterial()[i] == k1.getMaterial()[i]; } Assert.assertFalse(isEq); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 96a4c49498..f254f50709 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -1051,8 +1051,12 @@ public READDIR3Response mknod(XDR xdr, RpcInfo info) { @Override public REMOVE3Response remove(XDR xdr, RpcInfo info) { + return remove(xdr, getSecurityHandler(info), info.remoteAddress()); + } + + @VisibleForTesting + REMOVE3Response remove(XDR xdr, SecurityHandler securityHandler, SocketAddress remoteAddress) { REMOVE3Response response = new REMOVE3Response(Nfs3Status.NFS3_OK); - SecurityHandler securityHandler = getSecurityHandler(info); DFSClient dfsClient = clientCache.getDfsClient(securityHandler.getUser()); if (dfsClient == null) { response.setStatus(Nfs3Status.NFS3ERR_SERVERFAULT); @@ -1083,17 +1087,19 @@ public REMOVE3Response remove(XDR xdr, RpcInfo info) { return new REMOVE3Response(Nfs3Status.NFS3ERR_STALE); } + WccData errWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), + preOpDirAttr); + if (!checkAccessPrivilege(remoteAddress, AccessPrivilege.READ_WRITE)) { + return new REMOVE3Response(Nfs3Status.NFS3ERR_ACCES, errWcc); + } + String fileIdPath = dirFileIdPath + "/" + fileName; HdfsFileStatus fstat = Nfs3Utils.getFileStatus(dfsClient, fileIdPath); if (fstat == null) { - WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), - preOpDirAttr); - return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, dirWcc); + return new REMOVE3Response(Nfs3Status.NFS3ERR_NOENT, errWcc); } if (fstat.isDir()) { - WccData dirWcc = new WccData(Nfs3Utils.getWccAttr(preOpDirAttr), - preOpDirAttr); - return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, dirWcc); + return new REMOVE3Response(Nfs3Status.NFS3ERR_ISDIR, errWcc); } boolean result = dfsClient.delete(fileIdPath, false); diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java new file mode 100644 index 0000000000..b68fdb8510 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestClientAccessPrivilege.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.Nfs3Status; +import org.apache.hadoop.nfs.nfs3.response.REMOVE3Response; +import org.apache.hadoop.oncrpc.XDR; +import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; +import org.apache.hadoop.security.authorize.ProxyUsers; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestClientAccessPrivilege { + static MiniDFSCluster cluster = null; + static NfsConfiguration config = new NfsConfiguration(); + static DistributedFileSystem hdfs; + static NameNode nn; + static String testdir = "/tmp"; + static SecurityHandler securityHandler; + + @BeforeClass + public static void setup() throws Exception { + + String currentUser = System.getProperty("user.name"); + config.set(DefaultImpersonationProvider.getTestProvider() + .getProxySuperuserGroupConfKey(currentUser), "*"); + config.set(DefaultImpersonationProvider.getTestProvider() + .getProxySuperuserIpConfKey(currentUser), "*"); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + hdfs = cluster.getFileSystem(); + nn = cluster.getNameNode(); + + // Use ephemeral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + + securityHandler = Mockito.mock(SecurityHandler.class); + Mockito.when(securityHandler.getUser()).thenReturn( + System.getProperty("user.name")); + } + + @AfterClass + public static void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void createFiles() throws IllegalArgumentException, IOException { + hdfs.delete(new Path(testdir), true); + hdfs.mkdirs(new Path(testdir)); + DFSTestUtil.createFile(hdfs, new Path(testdir + "/f1"), 0, (short) 1, 0); + } + + @Test(timeout = 60000) + public void testClientAccessPrivilegeForRemove() throws Exception { + // Configure ro access for nfs1 service + config.set("dfs.nfs.exports.allowed.hosts", "* ro"); + + // Start nfs + Nfs3 nfs = new Nfs3(config); + nfs.startServiceInternal(false); + + RpcProgramNfs3 nfsd = (RpcProgramNfs3) nfs.getRpcProgram(); + + // Create a remove request + HdfsFileStatus status = nn.getRpcServer().getFileInfo(testdir); + long dirId = status.getFileId(); + + XDR xdr_req = new XDR(); + FileHandle handle = new FileHandle(dirId); + handle.serialize(xdr_req); + xdr_req.writeString("f1"); + + // Remove operation + REMOVE3Response response = nfsd.remove(xdr_req.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + + // Assert on return code + assertEquals("Incorrect return code", Nfs3Status.NFS3ERR_ACCES, + response.getStatus()); + + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d40cd12ccb..af09ddb5ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -331,6 +331,9 @@ Release 2.6.0 - UNRELEASED datanodes and change datanode to write block replicas using the specified storage type. (szetszwo) + HDFS-6701. Make seed optional in NetworkTopology#sortByDistance. + (Ashwin Shankar via wang) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) @@ -368,6 +371,12 @@ Release 2.6.0 - UNRELEASED HDFS-6667. In HDFS HA mode, Distcp/SLive with webhdfs on secure cluster fails with Client cannot authenticate via:[TOKEN, KERBEROS] error. (jing9) + HDFS-6704. Fix the command to launch JournalNode in HDFS-HA document. + (Akira AJISAKA via jing9) + + HDFS-6731. Run "hdfs zkfc-formatZK" on a server in a non-namenode will cause + a null pointer exception. (Masatake Iwasaki via brandonli) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -616,6 +625,8 @@ Release 2.5.0 - UNRELEASED HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes correctly. (szetszwo) + HDFS-6712. Document HDFS Multihoming Settings. (Arpit Agarwal) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) @@ -898,6 +909,9 @@ Release 2.5.0 - UNRELEASED HDFS-6378. NFS registration should timeout instead of hanging when portmap/rpcbind is not available (Abhiraj Butala via brandonli) + HDFS-6703. NFS: Files can be deleted from a read-only mount + (Srikanth Upputuri via brandonli) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index a79efacc28..9cea8d00fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -214,6 +214,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version"; public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; + public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK = "dfs.namenode.randomize-block-locations-per-block"; + public static final boolean DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false; + public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum"; public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 791c6dff5b..69b2b69541 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -345,7 +345,8 @@ private boolean isInactive(DatanodeInfo datanode) { /** Sort the located blocks by the distance to the target host. */ public void sortLocatedBlocks(final String targethost, - final List locatedblocks) { + final List locatedblocks, + boolean randomizeBlockLocationsPerBlock) { //sort the blocks // As it is possible for the separation of node manager and datanode, // here we should get node but not datanode only . @@ -372,8 +373,8 @@ public void sortLocatedBlocks(final String targethost, --lastActiveIndex; } int activeLen = lastActiveIndex + 1; - networktopology.sortByDistance(client, b.getLocations(), activeLen, - b.getBlock().getBlockId()); + networktopology.sortByDistance(client, b.getLocations(), activeLen, b + .getBlock().getBlockId(), randomizeBlockLocationsPerBlock); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 1aa308cebe..d19c214ff3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -83,6 +83,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT; + import static org.apache.hadoop.util.Time.now; import java.io.BufferedWriter; @@ -527,6 +530,8 @@ private void logAuditEvent(boolean succeeded, private final FSImage fsImage; + private boolean randomizeBlockLocationsPerBlock; + /** * Notify that loading of this FSDirectory is complete, and * it is imageLoaded for use @@ -836,6 +841,10 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { alwaysUseDelegationTokensForTests = conf.getBoolean( DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT); + + this.randomizeBlockLocationsPerBlock = conf.getBoolean( + DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK, + DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT); this.dtSecretManager = createDelegationTokenSecretManager(conf); this.dir = new FSDirectory(this, conf); @@ -1699,17 +1708,17 @@ LocatedBlocks getBlockLocations(String clientMachine, String src, LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true, true); if (blocks != null) { - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, blocks.getLocatedBlocks()); - + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock); + // lastBlock is not part of getLocatedBlocks(), might need to sort it too LocatedBlock lastBlock = blocks.getLastLocatedBlock(); if (lastBlock != null) { ArrayList lastBlockList = Lists.newArrayListWithCapacity(1); lastBlockList.add(lastBlock); - blockManager.getDatanodeManager().sortLocatedBlocks( - clientMachine, lastBlockList); + blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine, + lastBlockList, randomizeBlockLocationsPerBlock); } } return blocks; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java index 8b7a9956c6..a42b1e3189 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSZKFailoverController.java @@ -122,6 +122,11 @@ public static DFSZKFailoverController create(Configuration conf) { "HA is not enabled for this namenode."); } String nnId = HAUtil.getNameNodeId(localNNConf, nsId); + if (nnId == null) { + String msg = "Could not get the namenode ID of this node. " + + "You may run zkfc on the node other than namenode."; + throw new HadoopIllegalArgumentException(msg); + } NameNode.initializeGenericKeys(localNNConf, nsId, nnId); DFSUtil.setGenericConf(localNNConf, nsId, nnId, ZKFC_CONF_KEYS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4bef836e14..8230bc71e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2040,4 +2040,17 @@ + + dfs.namenode.randomize-block-locations-per-block + false + When fetching replica locations of a block, the replicas + are sorted based on network distance. This configuration parameter + determines whether the replicas at the same network distance are randomly + shuffled. By default, this is false, such that repeated requests for a block's + replicas always result in the same order. This potentially improves page cache + behavior. However, for some network topologies, it is desirable to shuffle this + order for better load balancing. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm index 3a34dffb68..ff6a42c259 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailabilityWithQJM.apt.vm @@ -416,8 +416,8 @@ HDFS High Availability Using the Quorum Journal Manager After all of the necessary configuration options have been set, you must start the JournalNode daemons on the set of machines where they will run. This - can be done by running the command "" and waiting - for the daemon to start on each of the relevant machines. + can be done by running the command "" and + waiting for the daemon to start on each of the relevant machines. Once the JournalNodes have been started, one must initially synchronize the two HA NameNodes' on-disk metadata. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsMultihoming.apt.vm b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsMultihoming.apt.vm new file mode 100644 index 0000000000..2be45671e2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HdfsMultihoming.apt.vm @@ -0,0 +1,145 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Distributed File System-${project.version} - Support for Multi-Homed Networks + --- + --- + ${maven.build.timestamp} + +HDFS Support for Multihomed Networks + + This document is targetted to cluster administrators deploying <<>> in + multihomed networks. Similar support for <<>>/<<>> is + work in progress and will be documented when available. + +%{toc|section=1|fromDepth=0} + +* Multihoming Background + + In multihomed networks the cluster nodes are connected to more than one + network interface. There could be multiple reasons for doing so. + + [[1]] <>: Security requirements may dictate that intra-cluster + traffic be confined to a different network than the network used to + transfer data in and out of the cluster. + + [[2]] <>: Intra-cluster traffic may use one or more high bandwidth + interconnects like Fiber Channel, Infiniband or 10GbE. + + [[3]] <>: The nodes may have multiple network adapters + connected to a single network to handle network adapter failure. + + + Note that NIC Bonding (also known as NIC Teaming or Link + Aggregation) is a related but separate topic. The following settings + are usually not applicable to a NIC bonding configuration which handles + multiplexing and failover transparently while presenting a single 'logical + network' to applications. + +* Fixing Hadoop Issues In Multihomed Environments + +** Ensuring HDFS Daemons Bind All Interfaces + + By default <<>> endpoints are specified as either hostnames or IP addresses. + In either case <<>> daemons will bind to a single IP address making + the daemons unreachable from other networks. + + The solution is to have separate setting for server endpoints to force binding + the wildcard IP address <<>> i.e. <<<0.0.0.0>>>. Do NOT supply a port + number with any of these settings. + +---- + + dfs.namenode.rpc-bind-host + 0.0.0.0 + + The actual address the RPC server will bind to. If this optional address is + set, it overrides only the hostname portion of dfs.namenode.rpc-address. + It can also be specified per name node or name service for HA/Federation. + This is useful for making the name node listen on all interfaces by + setting it to 0.0.0.0. + + + + + dfs.namenode.servicerpc-bind-host + 0.0.0.0 + + The actual address the service RPC server will bind to. If this optional address is + set, it overrides only the hostname portion of dfs.namenode.servicerpc-address. + It can also be specified per name node or name service for HA/Federation. + This is useful for making the name node listen on all interfaces by + setting it to 0.0.0.0. + + + + + dfs.namenode.http-bind-host + 0.0.0.0 + + The actual adress the HTTP server will bind to. If this optional address + is set, it overrides only the hostname portion of dfs.namenode.http-address. + It can also be specified per name node or name service for HA/Federation. + This is useful for making the name node HTTP server listen on all + interfaces by setting it to 0.0.0.0. + + + + + dfs.namenode.https-bind-host + 0.0.0.0 + + The actual adress the HTTPS server will bind to. If this optional address + is set, it overrides only the hostname portion of dfs.namenode.https-address. + It can also be specified per name node or name service for HA/Federation. + This is useful for making the name node HTTPS server listen on all + interfaces by setting it to 0.0.0.0. + + +---- + +** Clients use Hostnames when connecting to DataNodes + + By default <<>> clients connect to DataNodes using the IP address + provided by the NameNode. Depending on the network configuration this + IP address may be unreachable by the clients. The fix is letting clients perform + their own DNS resolution of the DataNode hostname. The following setting + enables this behavior. + +---- + + dfs.client.use.datanode.hostname + true + Whether clients should use datanode hostnames when + connecting to datanodes. + + +---- + +** DataNodes use HostNames when connecting to other DataNodes + + Rarely, the NameNode-resolved IP address for a DataNode may be unreachable + from other DataNodes. The fix is to force DataNodes to perform their own + DNS resolution for inter-DataNode connections. The following setting enables + this behavior. + +---- + + dfs.datanode.use.datanode.hostname + true + Whether datanodes should use datanode hostnames when + connecting to other datanodes for data transfer. + + +---- + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 2e6383cc26..faf946004a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -60,7 +60,14 @@ public void setupDatanodes() { DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/d3/r1"), DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/d3/r1"), DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/d3/r2"), - DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2") + DFSTestUtil.getDatanodeDescriptor("13.13.13.13", "/d3/r2"), + DFSTestUtil.getDatanodeDescriptor("14.14.14.14", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("15.15.15.15", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("16.16.16.16", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("17.17.17.17", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("18.18.18.18", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("19.19.19.19", "/d4/r1"), + DFSTestUtil.getDatanodeDescriptor("20.20.20.20", "/d4/r1"), }; for (int i = 0; i < dataNodes.length; i++) { cluster.add(dataNodes[i]); @@ -107,7 +114,7 @@ public void testCreateInvalidTopology() throws Exception { @Test public void testRacks() throws Exception { - assertEquals(cluster.getNumOfRacks(), 5); + assertEquals(cluster.getNumOfRacks(), 6); assertTrue(cluster.isOnSameRack(dataNodes[0], dataNodes[1])); assertFalse(cluster.isOnSameRack(dataNodes[1], dataNodes[2])); assertTrue(cluster.isOnSameRack(dataNodes[2], dataNodes[3])); @@ -133,7 +140,7 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[2]; testNodes[2] = dataNodes[0]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[2]); @@ -146,7 +153,7 @@ public void testSortByDistance() throws Exception { dtestNodes[3] = dataNodes[9]; dtestNodes[4] = dataNodes[10]; cluster.sortByDistance(dataNodes[8], dtestNodes, - dtestNodes.length - 2, 0xDEADBEEF); + dtestNodes.length - 2, 0xDEADBEEF, false); assertTrue(dtestNodes[0] == dataNodes[8]); assertTrue(dtestNodes[1] == dataNodes[11]); assertTrue(dtestNodes[2] == dataNodes[12]); @@ -158,7 +165,7 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[0]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[3]); @@ -168,7 +175,7 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[1]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[1]); assertTrue(testNodes[1] == dataNodes[3]); assertTrue(testNodes[2] == dataNodes[5]); @@ -178,7 +185,7 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[5]; testNodes[2] = dataNodes[3]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEADBEEF); + testNodes.length, 0xDEADBEEF, false); assertTrue(testNodes[0] == dataNodes[1]); assertTrue(testNodes[1] == dataNodes[3]); assertTrue(testNodes[2] == dataNodes[5]); @@ -188,7 +195,7 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[5]; testNodes[2] = dataNodes[3]; cluster.sortByDistance(dataNodes[0], testNodes, - testNodes.length, 0xDEAD); + testNodes.length, 0xDEAD, false); // sortByDistance does not take the "data center" layer into consideration // and it doesn't sort by getDistance, so 1, 5, 3 is also valid here assertTrue(testNodes[0] == dataNodes[1]); @@ -204,7 +211,27 @@ public void testSortByDistance() throws Exception { testNodes[1] = dataNodes[6]; testNodes[2] = dataNodes[7]; cluster.sortByDistance(dataNodes[i], testNodes, - testNodes.length, 0xBEADED+i); + testNodes.length, 0xBEADED+i, false); + if (first == null) { + first = testNodes[0]; + } else { + if (first != testNodes[0]) { + foundRandom = true; + break; + } + } + } + assertTrue("Expected to find a different first location", foundRandom); + // Array of rack local nodes with randomizeBlockLocationsPerBlock set to + // true + // Expect random order of block locations for same block + first = null; + for (int i = 1; i <= 4; i++) { + testNodes[0] = dataNodes[13]; + testNodes[1] = dataNodes[14]; + testNodes[2] = dataNodes[15]; + cluster.sortByDistance(dataNodes[15 + i], testNodes, testNodes.length, + 0xBEADED, true); if (first == null) { first = testNodes[0]; } else { diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 84295f3d23..c8a83bf64e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -159,6 +159,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5971. Move the default options for distcp -p to DistCpOptionSwitch. (clamb via wang) + MAPREDUCE-5963. ShuffleHandler DB schema should be versioned with + compatible/incompatible changes (Junping Du via jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 02283782a5..1d781be145 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -82,10 +82,13 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -125,6 +128,7 @@ import org.jboss.netty.util.CharsetUtil; import org.mortbay.jetty.HttpHeaders; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; @@ -146,8 +150,9 @@ public class ShuffleHandler extends AuxiliaryService { Pattern.CASE_INSENSITIVE); private static final String STATE_DB_NAME = "mapreduce_shuffle_state"; - private static final String STATE_DB_SCHEMA_VERSION_KEY = "schema-version"; - private static final String STATE_DB_SCHEMA_VERSION = "1.0"; + private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version"; + protected static final NMDBSchemaVersion CURRENT_VERSION_INFO = + NMDBSchemaVersion.newInstance(1, 0); private int port; private ChannelFactory selector; @@ -466,18 +471,15 @@ private void startStore(Path recoveryRoot) throws IOException { Path dbPath = new Path(recoveryRoot, STATE_DB_NAME); LOG.info("Using state database at " + dbPath + " for recovery"); File dbfile = new File(dbPath.toString()); - byte[] schemaVersionData; try { stateDb = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); options.createIfMissing(true); try { stateDb = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = bytes(STATE_DB_SCHEMA_VERSION); - stateDb.put(bytes(STATE_DB_SCHEMA_VERSION_KEY), schemaVersionData); + storeVersion(); } catch (DBException dbExc) { throw new IOException("Unable to create state store", dbExc); } @@ -485,15 +487,69 @@ private void startStore(Path recoveryRoot) throws IOException { throw e; } } - if (schemaVersionData != null) { - String schemaVersion = asString(schemaVersionData); - // only support exact schema matches for now - if (!STATE_DB_SCHEMA_VERSION.equals(schemaVersion)) { - throw new IOException("Incompatible state database schema, found " - + schemaVersion + " expected " + STATE_DB_SCHEMA_VERSION); - } + checkVersion(); + } + + @VisibleForTesting + NMDBSchemaVersion loadVersion() throws IOException { + byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return NMDBSchemaVersion.newInstance(1, 0); + } + NMDBSchemaVersion version = + new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); + return version; + } + + private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException { + String key = STATE_DB_SCHEMA_VERSION_KEY; + byte[] data = + ((NMDBSchemaVersionPBImpl) version).getProto().toByteArray(); + try { + stateDb.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e.getMessage(), e); + } + } + + private void storeVersion() throws IOException { + storeSchemaVersion(CURRENT_VERSION_INFO); + } + + // Only used for test + @VisibleForTesting + void storeVersion(NMDBSchemaVersion version) throws IOException { + storeSchemaVersion(version); + } + + protected NMDBSchemaVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of DB schema is a major upgrade, and any + * compatible change of DB schema is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade shuffle info or remove incompatible old state. + */ + private void checkVersion() throws IOException { + NMDBSchemaVersion loadedVersion = loadVersion(); + LOG.info("Loaded state DB schema version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing state DB schedma version info " + getCurrentVersion()); + storeVersion(); } else { - throw new IOException("State database schema version not found"); + throw new IOException( + "Incompatible version for state DB schema: expecting DB schema version " + + getCurrentVersion() + ", but loading version " + loadedVersion); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index e892c61f6c..0974cc6ab1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -67,6 +67,7 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; @@ -718,6 +720,94 @@ public void testRecovery() throws IOException { FileUtil.fullyDelete(tmpDir); } } + + @Test + public void testRecoveryFromOtherVersions() throws IOException { + final String user = "someuser"; + final ApplicationId appId = ApplicationId.newInstance(12345, 1); + final File tmpDir = new File(System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), + TestShuffleHandler.class.getName()); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3); + ShuffleHandler shuffle = new ShuffleHandler(); + // emulate aux services startup with recovery enabled + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + tmpDir.mkdirs(); + try { + shuffle.init(conf); + shuffle.start(); + + // setup a shuffle token for an application + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + outputBuffer.reset(); + Token jt = new Token( + "identifier".getBytes(), "password".getBytes(), new Text(user), + new Text("shuffleService")); + jt.write(outputBuffer); + shuffle.initializeApplication(new ApplicationInitializationContext(user, + appId, ByteBuffer.wrap(outputBuffer.getData(), 0, + outputBuffer.getLength()))); + + // verify we are authorized to shuffle + int rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0); + Assert.assertEquals(version, shuffle.getCurrentVersion()); + + // emulate shuffle handler restart with compatible version + NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1); + // update version info before close shuffle + shuffle.storeVersion(version11); + Assert.assertEquals(version11, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + shuffle.start(); + // shuffle version will be override by CURRENT_VERSION_INFO after restart + // successfully. + Assert.assertEquals(version, shuffle.loadVersion()); + // verify we are still authorized to shuffle to the old application + rc = getShuffleResponseCode(shuffle, jt); + Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); + + // emulate shuffle handler restart with incompatible version + NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1); + shuffle.storeVersion(version21); + Assert.assertEquals(version21, shuffle.loadVersion()); + shuffle.close(); + shuffle = new ShuffleHandler(); + shuffle.setRecoveryPath(new Path(tmpDir.toString())); + shuffle.init(conf); + + try { + shuffle.start(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for state DB schema:")); + } + + } finally { + if (shuffle != null) { + shuffle.close(); + } + FileUtil.fullyDelete(tmpDir); + } + } private static int getShuffleResponseCode(ShuffleHandler shuffle, Token jt) throws IOException { diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml index 4bfbf6359b..ec9329216d 100644 --- a/hadoop-project/src/site/site.xml +++ b/hadoop-project/src/site/site.xml @@ -89,6 +89,7 @@ + diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7880af5848..4c42fd7cdc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED YARN-2013. The diagnostics is always the ExitCodeException stack when the container crashes. (Tsuyoshi OZAWA via junping_du) + YARN-2295. Refactored DistributedShell to use public APIs of protocol records. + (Li Lu via jianhe) + OPTIMIZATIONS BUG FIXES @@ -85,6 +88,12 @@ Release 2.6.0 - UNRELEASED YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement (Leitao Guo via jlowe) + YARN-2273. NPE in ContinuousScheduling thread when we lose a node. + (Wei Yan via kasha) + + YARN-2313. Livelock can occur in FairScheduler when there are lots of + running apps (Tsuyoshi Ozawa via Sandy Ryza) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -415,6 +424,9 @@ Release 2.5.0 - UNRELEASED YARN-2270. Made TestFSDownload#testDownloadPublicWithStatCache be skipped when there’s no ancestor permissions. (Akira Ajisaka via zjshen) + YARN-2319. Made the MiniKdc instance start/close before/after the class of + TestRMWebServicesDelegationTokens. (Wenwu Peng via zjshen) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 2da958a08b..781d7a3592 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -194,6 +194,12 @@ + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 6722307d69..5e1cbbcd93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -94,7 +95,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; @@ -522,6 +522,8 @@ public void run() throws YarnException, IOException { + appAttemptID.toString(), e); } + // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class + // are marked as LimitedPrivate Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); DataOutputBuffer dob = new DataOutputBuffer(); @@ -900,11 +902,6 @@ public LaunchContainerRunnable( public void run() { LOG.info("Setting up container launch container for containerid=" + container.getId()); - ContainerLaunchContext ctx = Records - .newRecord(ContainerLaunchContext.class); - - // Set the environment - ctx.setEnvironment(shellEnv); // Set the local resources Map localResources = new HashMap(); @@ -935,16 +932,13 @@ public void run() { return; } - LocalResource shellRsrc = Records.newRecord(LocalResource.class); - shellRsrc.setType(LocalResourceType.FILE); - shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + URL yarnUrl = null; try { - shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI( - renamedScriptPath.toString()))); + yarnUrl = ConverterUtils.getYarnUrlFromURI( + new URI(renamedScriptPath.toString())); } catch (URISyntaxException e) { LOG.error("Error when trying to use shell script path specified" + " in env, path=" + renamedScriptPath, e); - // A failure scenario on bad input such as invalid shell script path // We know we cannot continue launching the container // so we should release it. @@ -953,13 +947,13 @@ public void run() { numFailedContainers.incrementAndGet(); return; } - shellRsrc.setTimestamp(shellScriptPathTimestamp); - shellRsrc.setSize(shellScriptPathLen); + LocalResource shellRsrc = LocalResource.newInstance(yarnUrl, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + shellScriptPathLen, shellScriptPathTimestamp); localResources.put(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath, shellRsrc); shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command; } - ctx.setLocalResources(localResources); // Set the necessary command to execute on the allocated container Vector vargs = new Vector(5); @@ -986,16 +980,18 @@ public void run() { List commands = new ArrayList(); commands.add(command.toString()); - ctx.setCommands(commands); - // Set up tokens for the container too. Today, for normal shell commands, - // the container in distribute-shell doesn't need any tokens. We are - // populating them mainly for NodeManagers to be able to download any - // files in the distributed file-system. The tokens are otherwise also - // useful in cases, for e.g., when one is running a "hadoop dfs" command - // inside the distributed shell. - ctx.setTokens(allTokens.duplicate()); + // Set up ContainerLaunchContext, setting local resource, environment, + // command and token for constructor. + // Note for tokens: Set up tokens for the container too. Today, for normal + // shell commands, the container in distribute-shell doesn't need any + // tokens. We are populating them mainly for NodeManagers to be able to + // download anyfiles in the distributed file-system. The tokens are + // otherwise also useful in cases, for e.g., when one is running a + // "hadoop dfs" command inside the distributed shell. + ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( + localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } @@ -1024,15 +1020,13 @@ private ContainerRequest setupContainerAskForRM() { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(requestPriority); + Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(containerMemory); - capability.setVirtualCores(containerVirtualCores); + Resource capability = Resource.newInstance(containerMemory, + containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 3336ed9728..05fd883be9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -456,9 +456,6 @@ public boolean run() throws IOException, YarnException { appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources @@ -508,8 +505,6 @@ public boolean run() throws IOException, YarnException { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } - // Set local resource info into app master container launch context - amContainer.setLocalResources(localResources); // Set the necessary security tokens as needed //amContainer.setContainerTokens(containerToken); @@ -550,8 +545,6 @@ public boolean run() throws IOException, YarnException { env.put("CLASSPATH", classPathEnv.toString()); - amContainer.setEnvironment(env); - // Set the necessary command to execute the application master Vector vargs = new Vector(30); @@ -587,14 +580,15 @@ public boolean run() throws IOException, YarnException { LOG.info("Completed setting up app master command " + command.toString()); List commands = new ArrayList(); commands.add(command.toString()); - amContainer.setCommands(commands); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( + localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(amMemory); - capability.setVirtualCores(amVCores); + Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application @@ -603,6 +597,7 @@ public boolean run() throws IOException, YarnException { // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { + // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null || tokenRenewer.length() == 0) { @@ -627,9 +622,8 @@ public boolean run() throws IOException, YarnException { appContext.setAMContainerSpec(amContainer); // Set the priority for the application master - Priority pri = Records.newRecord(Priority.class); // TODO - what is the range for priority? how to decide? - pri.setPriority(amPriority); + Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 64657ad7cc..50a0755be6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1294,12 +1294,20 @@ private void setAMContainerCrashedDiagnosticsAndExitStatus( private String getAMContainerCrashedDiagnostics( RMAppAttemptContainerFinishedEvent finishEvent) { ContainerStatus status = finishEvent.getContainerStatus(); - String diagnostics = - "AM Container for " + finishEvent.getApplicationAttemptId() - + " exited with " + " exitCode: " + status.getExitStatus() + ". " - + "Check application tracking page: " + this.getTrackingUrl() - + " . Then, click on links to logs of each attempt for detailed output. "; - return diagnostics; + StringBuilder diagnosticsBuilder = new StringBuilder(); + diagnosticsBuilder.append("AM Container for ").append( + finishEvent.getApplicationAttemptId()).append( + " exited with ").append(" exitCode: ").append(status.getExitStatus()). + append("\n"); + if (this.getTrackingUrl() != null) { + diagnosticsBuilder.append("For more detailed output,").append( + " check application tracking page:").append( + this.getTrackingUrl()).append( + "Then, click on links to logs of each attempt.\n"); + } + diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics()) + .append("Failing this attempt"); + return diagnosticsBuilder.toString(); } private static class FinalTransition extends BaseFinalTransition { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 18ccf9d8a9..27a0075c1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -135,7 +135,7 @@ public class FairScheduler extends public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); // How often fair shares are re-calculated (ms) - protected long UPDATE_INTERVAL = 500; + protected long updateInterval; private final int UPDATE_DEBUG_FREQUENCY = 5; private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; @@ -244,13 +244,13 @@ public QueueManager getQueueManager() { /** * A runnable which calls {@link FairScheduler#update()} every - * UPDATE_INTERVAL milliseconds. + * updateInterval milliseconds. */ private class UpdateThread implements Runnable { public void run() { while (true) { try { - Thread.sleep(UPDATE_INTERVAL); + Thread.sleep(updateInterval); update(); preemptTasksIfNecessary(); } catch (Exception e) { @@ -970,37 +970,27 @@ private synchronized void nodeUpdate(RMNode nm) { } } - private void continuousScheduling() { - while (true) { - List nodeIdList = new ArrayList(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } + void continuousSchedulingAttempt() { + List nodeIdList = new ArrayList(nodes.keySet()); + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = getFSSchedulerNode(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); + if (node != null && Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.error("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } } } @@ -1010,6 +1000,12 @@ private class NodeAvailableResourceComparator implements Comparator { @Override public int compare(NodeId n1, NodeId n2) { + if (!nodes.containsKey(n1)) { + return 1; + } + if (!nodes.containsKey(n2)) { + return -1; + } return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); @@ -1210,6 +1206,15 @@ private synchronized void initScheduler(Configuration conf) waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); + updateInterval = this.conf.getUpdateInterval(); + if (updateInterval < 0) { + updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; + LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS + + " is invalid, so using default value " + + + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + + " ms instead"); + } + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); // This stores per-application scheduling information this.applications = @@ -1234,7 +1239,16 @@ private synchronized void initScheduler(Configuration conf) new Runnable() { @Override public void run() { - continuousScheduling(); + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.error("Continuous scheduling thread interrupted. Exiting. ", + e); + return; + } + } } } ); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 0fd242d662..473c369b09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -123,6 +123,11 @@ public class FairSchedulerConfiguration extends Configuration { protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final int DEFAULT_MAX_ASSIGN = -1; + /** The update interval for calculating resources in FairScheduler .*/ + public static final String UPDATE_INTERVAL_MS = + CONF_PREFIX + "update-interval-ms"; + public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; + public FairSchedulerConfiguration() { super(); } @@ -246,6 +251,10 @@ public static Resource parseResourceConfigValue(String val) "Error reading resource config", ex); } } + + public long getUpdateInterval() { + return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS); + } private static int findResource(String val, String units) throws AllocationConfigurationException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index ca0fc3960a..1de35fcfa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -823,7 +823,9 @@ public void testAMCrashAtAllocated() { applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); - verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics()); + boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null); + verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(), + exitCode, shouldCheckURL); } @Test @@ -1241,11 +1243,18 @@ scheduler, masterService, submissionContext, new Configuration(), verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } - private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics) { - assertTrue("Diagnostic information does not contain application proxy URL", - diagnostics.contains(applicationAttempt.getWebProxyBase())); + private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics, + int exitCode, boolean shouldCheckURL) { assertTrue("Diagnostic information does not point the logs to the users", diagnostics.contains("logs")); + assertTrue("Diagnostic information does not contain application attempt id", + diagnostics.contains(applicationAttempt.getAppAttemptId().toString())); + assertTrue("Diagnostic information does not contain application exit code", + diagnostics.contains("exitCode: " + exitCode)); + if (shouldCheckURL) { + assertTrue("Diagnostic information does not contain application proxy URL", + diagnostics.contains(applicationAttempt.getWebProxyBase())); + } } private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ed492cec40..df157e7500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2763,7 +2763,43 @@ public void testContinuousScheduling() throws Exception { Assert.assertEquals(2, nodes.size()); } - + @Test + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling once manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, scheduler.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 310104bd08..903c7af9cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -94,7 +94,7 @@ private void startResourceManager(float utilizationThreshold) { scheduler = (FairScheduler)resourceManager.getResourceScheduler(); scheduler.setClock(clock); - scheduler.UPDATE_INTERVAL = 60 * 1000; + scheduler.updateInterval = 60 * 1000; } private void registerNodeAndSubmitApp( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java index 9d25105bd4..62612affeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesDelegationTokens.java @@ -60,7 +60,9 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -90,28 +92,14 @@ @RunWith(Parameterized.class) public class TestRMWebServicesDelegationTokens extends JerseyTest { - private static final File testRootDir = new File("target", - TestRMWebServicesDelegationTokens.class.getName() + "-root"); + private static File testRootDir; private static File httpSpnegoKeytabFile = new File( KerberosTestUtils.getKeytabFile()); - private static String httpSpnegoPrincipal = KerberosTestUtils .getServerPrincipal(); - - private static boolean miniKDCStarted = false; private static MiniKdc testMiniKDC; - static { - try { - testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir); - } catch (Exception e) { - assertTrue("Couldn't create MiniKDC", false); - } - } - private static MockRM rm; - private Injector injector; - private boolean isKerberosAuth = false; // Make sure the test uses the published header string @@ -237,7 +225,6 @@ public TestRMWebServicesDelegationTokens(int run) throws Exception { .contextListenerClass(GuiceServletConfig.class) .filterClass(com.google.inject.servlet.GuiceFilter.class) .contextPath("jersey-guice-filter").servletPath("/").build()); - setupKDC(); switch (run) { case 0: default: @@ -249,17 +236,14 @@ public TestRMWebServicesDelegationTokens(int run) throws Exception { } } - private void setupKDC() throws Exception { - if (miniKDCStarted == false) { - testMiniKDC.start(); - getKdc().createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost", - "client", "client2", "client3"); - miniKDCStarted = true; - } - } - - private MiniKdc getKdc() { - return testMiniKDC; + @BeforeClass + public static void setupKDC() throws Exception { + testRootDir = new File("target", + TestRMWebServicesDelegationTokens.class.getName() + "-root"); + testMiniKDC = new MiniKdc(MiniKdc.createConf(), testRootDir); + testMiniKDC.start(); + testMiniKDC.createPrincipal(httpSpnegoKeytabFile, "HTTP/localhost", + "client", "client2", "client3"); } @Before @@ -270,6 +254,13 @@ public void setUp() throws Exception { testRootDir.deleteOnExit(); } + @AfterClass + public static void shutdownKdc() { + if (testMiniKDC != null) { + testMiniKDC.stop(); + } + } + @After @Override public void tearDown() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index b9cda2c254..9bb8563158 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -205,6 +205,12 @@ Properties that can be placed in yarn-site.xml instead. Defaults to true. If a queue placement policy is given in the allocations file, this property is ignored. + * <<>> + + * The interval at which to lock the scheduler and recalculate fair shares, + recalculate demand, and check whether anything is due for preemption. + Defaults to 500 ms. + Allocation file format The allocation file must be in XML format. The format contains five types of diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm index 4f3825b9d5..d91c513348 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/YarnCommands.apt.vm @@ -157,15 +157,16 @@ Usage: yarn [--config confdir] COMMAND Start the ResourceManager ------- - Usage: yarn resourcemanager [-format] + Usage: yarn resourcemanager [-format-state-store] ------- *---------------+--------------+ || COMMAND_OPTIONS || Description | *---------------+--------------+ -| -format | Formats the RMStateStore. This will clear the RMStateStore and is -| | useful if past applications are no longer needed. This should be run -| | only when the ResourceManager is not running. +| -format-state-store | Formats the RMStateStore. This will clear the +| | RMStateStore and is useful if past applications are no +| | longer needed. This should be run only when the +| | ResourceManager is not running. *---------------+--------------+ ** nodemanager