Merge branch 'trunk' into HDFS-6584

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
This commit is contained in:
Tsz-Wo Nicholas Sze 2014-09-18 13:00:29 +08:00
commit 2d2b0009e6
39 changed files with 845 additions and 133 deletions

View File

@ -530,6 +530,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang) HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang)
HADOOP-11016. KMS should support signing cookies with zookeeper secret
manager. (tucu)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd) HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -721,8 +724,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang HADOOP-11056. OsSecureRandom.setConf() might leak file descriptors (yzhang
via cmccabe) via cmccabe)
HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files HADOOP-11040. Return value of read(ByteBuffer buf) in CryptoInputStream is
(cmccabe) incorrect in some cases. (Yi Liu via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS

View File

@ -471,7 +471,16 @@ public int read(ByteBuffer buf) throws IOException {
streamOffset += n; // Read n bytes streamOffset += n; // Read n bytes
decrypt(buf, n, pos); decrypt(buf, n, pos);
} }
return n;
if (n >= 0) {
return unread + n;
} else {
if (unread == 0) {
return -1;
} else {
return unread;
}
}
} }
throw new UnsupportedOperationException("ByteBuffer read unsupported " + throw new UnsupportedOperationException("ByteBuffer read unsupported " +

View File

@ -46,7 +46,8 @@ public abstract KeyProvider createProvider(URI providerName,
) throws IOException; ) throws IOException;
private static final ServiceLoader<KeyProviderFactory> serviceLoader = private static final ServiceLoader<KeyProviderFactory> serviceLoader =
ServiceLoader.load(KeyProviderFactory.class); ServiceLoader.load(KeyProviderFactory.class,
KeyProviderFactory.class.getClassLoader());
// Iterate through the serviceLoader to avoid lazy loading. // Iterate through the serviceLoader to avoid lazy loading.
// Lazy loading would require synchronization in concurrent use cases. // Lazy loading would require synchronization in concurrent use cases.

View File

@ -200,6 +200,15 @@ public long getAccessTime() {
public FsPermission getPermission() { public FsPermission getPermission() {
return permission; return permission;
} }
/**
* Tell whether the underlying file or directory is encrypted or not.
*
* @return true if the underlying file is encrypted.
*/
public boolean isEncrypted() {
return permission.getEncryptedBit();
}
/** /**
* Get the owner of the file. * Get the owner of the file.

View File

@ -294,6 +294,13 @@ public boolean getAclBit() {
return false; return false;
} }
/**
* Returns true if the file is encrypted or directory is in an encryption zone
*/
public boolean getEncryptedBit() {
return false;
}
/** Set the user file creation mask (umask) */ /** Set the user file creation mask (umask) */
public static void setUMask(Configuration conf, FsPermission umask) { public static void setUMask(Configuration conf, FsPermission umask) {
conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort())); conf.set(UMASK_LABEL, String.format("%1$03o", umask.toShort()));

View File

@ -64,6 +64,33 @@ all operations on a valid FileSystem MUST result in a new FileSystem that is als
def isSymlink(FS, p) = p in symlinks(FS) def isSymlink(FS, p) = p in symlinks(FS)
### 'boolean inEncryptionZone(Path p)'
Return True if the data for p is encrypted. The nature of the encryption and the
mechanism for creating an encryption zone are implementation details not covered
in this specification. No guarantees are made about the quality of the
encryption. The metadata is not encrypted.
#### Preconditions
if not exists(FS, p) : raise FileNotFoundException
#### Postconditions
#### Invariants
All files and directories under a directory in an encryption zone are also in an
encryption zone
forall d in directories(FS): inEncyptionZone(FS, d) implies
forall c in children(FS, d) where (isFile(FS, c) or isDir(FS, c)) :
inEncyptionZone(FS, c)
For all files in an encrypted zone, the data is encrypted, but the encryption
type and specification are not defined.
forall f in files(FS) where inEncyptionZone(FS, c):
isEncrypted(data(f))
### `FileStatus getFileStatus(Path p)` ### `FileStatus getFileStatus(Path p)`
@ -88,6 +115,10 @@ Get the status of a path
stat.length = 0 stat.length = 0
stat.isdir = False stat.isdir = False
stat.symlink = FS.Symlinks[p] stat.symlink = FS.Symlinks[p]
if inEncryptionZone(FS, p) :
stat.isEncrypted = True
else
stat.isEncrypted = False
### `Path getHomeDirectory()` ### `Path getHomeDirectory()`

View File

@ -469,6 +469,7 @@ private void byteBufferReadCheck(InputStream in, ByteBuffer buf,
int bufPos) throws Exception { int bufPos) throws Exception {
buf.position(bufPos); buf.position(bufPos);
int n = ((ByteBufferReadable) in).read(buf); int n = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(bufPos + n, buf.position());
byte[] readData = new byte[n]; byte[] readData = new byte[n];
buf.rewind(); buf.rewind();
buf.position(bufPos); buf.position(bufPos);
@ -568,6 +569,7 @@ public void testCombinedOp() throws Exception {
// Read forward len1 // Read forward len1
ByteBuffer buf = ByteBuffer.allocate(len1); ByteBuffer buf = ByteBuffer.allocate(len1);
int nRead = ((ByteBufferReadable) in).read(buf); int nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead]; readData = new byte[nRead];
buf.rewind(); buf.rewind();
buf.get(readData); buf.get(readData);
@ -575,9 +577,10 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, nRead); System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData); Assert.assertArrayEquals(readData, expectedData);
// Pos should be len1 + 2 * len2 + nRead long lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos(); pos = ((Seekable) in).getPos();
Assert.assertEquals(len1 + 2 * len2 + nRead, pos); Assert.assertEquals(lastPos + nRead, pos);
// Pos: 1/3 dataLen // Pos: 1/3 dataLen
positionedReadCheck(in , dataLen / 3); positionedReadCheck(in , dataLen / 3);
@ -589,13 +592,15 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, len1); System.arraycopy(data, (int)pos, expectedData, 0, len1);
Assert.assertArrayEquals(readData, expectedData); Assert.assertArrayEquals(readData, expectedData);
// Pos should be 2 * len1 + 2 * len2 + nRead lastPos = pos;
// Pos should be lastPos + len1
pos = ((Seekable) in).getPos(); pos = ((Seekable) in).getPos();
Assert.assertEquals(2 * len1 + 2 * len2 + nRead, pos); Assert.assertEquals(lastPos + len1, pos);
// Read forward len1 // Read forward len1
buf = ByteBuffer.allocate(len1); buf = ByteBuffer.allocate(len1);
nRead = ((ByteBufferReadable) in).read(buf); nRead = ((ByteBufferReadable) in).read(buf);
Assert.assertEquals(nRead, buf.position());
readData = new byte[nRead]; readData = new byte[nRead];
buf.rewind(); buf.rewind();
buf.get(readData); buf.get(readData);
@ -603,6 +608,11 @@ public void testCombinedOp() throws Exception {
System.arraycopy(data, (int)pos, expectedData, 0, nRead); System.arraycopy(data, (int)pos, expectedData, 0, nRead);
Assert.assertArrayEquals(readData, expectedData); Assert.assertArrayEquals(readData, expectedData);
lastPos = pos;
// Pos should be lastPos + nRead
pos = ((Seekable) in).getPos();
Assert.assertEquals(lastPos + nRead, pos);
// ByteBuffer read after EOF // ByteBuffer read after EOF
((Seekable) in).seek(dataLen); ((Seekable) in).seek(dataLen);
buf.clear(); buf.clear();

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.junit.Test; import org.junit.Test;
@ -30,6 +31,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
/** /**
@ -65,6 +67,16 @@ public void testOpenReadZeroByteFile() throws Throwable {
assertMinusOne("initial byte read", result); assertMinusOne("initial byte read", result);
} }
@Test
public void testFsIsEncrypted() throws Exception {
describe("create an empty file and call FileStatus.isEncrypted()");
final Path path = path("file");
createFile(getFileSystem(), path, false, new byte[0]);
final FileStatus stat = getFileSystem().getFileStatus(path);
assertFalse("Expecting false for stat.isEncrypted()",
stat.isEncrypted());
}
@Test @Test
public void testOpenReadDir() throws Throwable { public void testOpenReadDir() throws Throwable {
describe("create & read a directory"); describe("create & read a directory");

View File

@ -187,6 +187,11 @@
<artifactId>metrics-core</artifactId> <artifactId>metrics-core</artifactId>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -16,7 +16,7 @@
<!-- KMS Backend KeyProvider --> <!-- KMS Backend KeyProvider -->
<property> <property>
<name>hadoop.security.key.provider.path</name> <name>hadoop.kms.key.provider.uri</name>
<value>jceks://file@/${user.home}/kms.keystore</value> <value>jceks://file@/${user.home}/kms.keystore</value>
<description> <description>
</description> </description>
@ -68,4 +68,61 @@
</description> </description>
</property> </property>
<!-- Authentication cookie signature source -->
<property>
<name>hadoop.kms.authentication.signer.secret.provider</name>
<value>random</value>
<description>
Indicates how the secret to sign the authentication cookies will be
stored. Options are 'random' (default), 'string' and 'zookeeper'.
If using a setup with multiple KMS instances, 'zookeeper' should be used.
</description>
</property>
<!-- Configuration for 'zookeeper' authentication cookie signature source -->
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
<value>/hadoop-kms/hadoop-auth-signature-secret</value>
<description>
The Zookeeper ZNode path where the KMS instances will store and retrieve
the secret from.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
<value>#HOSTNAME#:#PORT#,...</value>
<description>
The Zookeeper connection string, a list of hostnames and port comma
separated.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
<value>kerberos</value>
<description>
The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
<value>/etc/hadoop/conf/kms.keytab</value>
<description>
The absolute path for the Kerberos keytab with the credentials to
connect to Zookeeper.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
<value>kms/#HOSTNAME#</value>
<description>
The Kerberos service principal used to connect to Zookeeper.
</description>
</property>
</configuration> </configuration>

View File

@ -46,7 +46,8 @@
@InterfaceAudience.Private @InterfaceAudience.Private
public class KMSAuthenticationFilter public class KMSAuthenticationFilter
extends DelegationTokenAuthenticationFilter { extends DelegationTokenAuthenticationFilter {
private static final String CONF_PREFIX = KMSConfiguration.CONFIG_PREFIX +
public static final String CONFIG_PREFIX = KMSConfiguration.CONFIG_PREFIX +
"authentication."; "authentication.";
@Override @Override
@ -56,9 +57,9 @@ protected Properties getConfiguration(String configPrefix,
Configuration conf = KMSWebApp.getConfiguration(); Configuration conf = KMSWebApp.getConfiguration();
for (Map.Entry<String, String> entry : conf) { for (Map.Entry<String, String> entry : conf) {
String name = entry.getKey(); String name = entry.getKey();
if (name.startsWith(CONF_PREFIX)) { if (name.startsWith(CONFIG_PREFIX)) {
String value = conf.get(name); String value = conf.get(name);
name = name.substring(CONF_PREFIX.length()); name = name.substring(CONFIG_PREFIX.length());
props.setProperty(name, value); props.setProperty(name, value);
} }
} }

View File

@ -40,6 +40,10 @@ public class KMSConfiguration {
public static final String KEY_ACL_PREFIX = "key.acl."; public static final String KEY_ACL_PREFIX = "key.acl.";
public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl."; public static final String DEFAULT_KEY_ACL_PREFIX = "default.key.acl.";
// Property to set the backing KeyProvider
public static final String KEY_PROVIDER_URI = CONFIG_PREFIX +
"key.provider.uri";
// Property to Enable/Disable Caching // Property to Enable/Disable Caching
public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX + public static final String KEY_CACHE_ENABLE = CONFIG_PREFIX +
"cache.enable"; "cache.enable";

View File

@ -39,6 +39,7 @@
import javax.servlet.ServletContextListener; import javax.servlet.ServletContextListener;
import java.io.File; import java.io.File;
import java.net.URI;
import java.net.URL; import java.net.URL;
import java.util.List; import java.util.List;
@ -159,17 +160,12 @@ public void contextInitialized(ServletContextEvent sce) {
new AccessControlList(AccessControlList.WILDCARD_ACL_VALUE)); new AccessControlList(AccessControlList.WILDCARD_ACL_VALUE));
// intializing the KeyProvider // intializing the KeyProvider
String providerString = kmsConf.get(KMSConfiguration.KEY_PROVIDER_URI);
List<KeyProvider> providers = KeyProviderFactory.getProviders(kmsConf); if (providerString == null) {
if (providers.isEmpty()) {
throw new IllegalStateException("No KeyProvider has been defined"); throw new IllegalStateException("No KeyProvider has been defined");
} }
if (providers.size() > 1) { KeyProvider keyProvider =
LOG.warn("There is more than one KeyProvider configured '{}', using " + KeyProviderFactory.get(new URI(providerString), kmsConf);
"the first provider",
kmsConf.get(KeyProviderFactory.KEY_PROVIDER_PATH));
}
KeyProvider keyProvider = providers.get(0);
if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE, if (kmsConf.getBoolean(KMSConfiguration.KEY_CACHE_ENABLE,
KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) { KMSConfiguration.KEY_CACHE_ENABLE_DEFAULT)) {
long keyTimeOutMillis = long keyTimeOutMillis =

View File

@ -51,7 +51,7 @@ Hadoop Key Management Server (KMS) - Documentation Sets ${project.version}
+---+ +---+
<property> <property>
<name>hadoop.security.key.provider.path</name> <name>hadoop.kms.key.provider.uri</name>
<value>jceks://file@/${user.home}/kms.keystore</value> <value>jceks://file@/${user.home}/kms.keystore</value>
</property> </property>
@ -448,16 +448,16 @@ $ keytool -genkey -alias tomcat -keyalg RSA
KMS supports access control for all non-read operations at the Key level. KMS supports access control for all non-read operations at the Key level.
All Key Access operations are classified as : All Key Access operations are classified as :
* MANAGEMENT - createKey, deleteKey, rolloverNewVersion * MANAGEMENT - createKey, deleteKey, rolloverNewVersion
* GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys * GENERATE_EEK - generateEncryptedKey, warmUpEncryptedKeys
* DECRYPT_EEK - decryptEncryptedKey; * DECRYPT_EEK - decryptEncryptedKey
* READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata, * READ - getKeyVersion, getKeyVersions, getMetadata, getKeysMetadata,
getCurrentKey; getCurrentKey
* ALL - all of the above; * ALL - all of the above
These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows These can be defined in the KMS <<<etc/hadoop/kms-acls.xml>>> as follows
@ -554,41 +554,124 @@ $ keytool -genkey -alias tomcat -keyalg RSA
KMS delegation token secret manager can be configured with the following KMS delegation token secret manager can be configured with the following
properties: properties:
+---+ +---+
<property> <property>
<name>hadoop.kms.authentication.delegation-token.update-interval.sec</name> <name>hadoop.kms.authentication.delegation-token.update-interval.sec</name>
<value>86400</value> <value>86400</value>
<description> <description>
How often the master key is rotated, in seconds. Default value 1 day. How often the master key is rotated, in seconds. Default value 1 day.
</description> </description>
</property> </property>
<property> <property>
<name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name> <name>hadoop.kms.authentication.delegation-token.max-lifetime.sec</name>
<value>604800</value> <value>604800</value>
<description> <description>
Maximum lifetime of a delagation token, in seconds. Default value 7 days. Maximum lifetime of a delagation token, in seconds. Default value 7 days.
</description> </description>
</property> </property>
<property> <property>
<name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name> <name>hadoop.kms.authentication.delegation-token.renew-interval.sec</name>
<value>86400</value> <value>86400</value>
<description> <description>
Renewal interval of a delagation token, in seconds. Default value 1 day. Renewal interval of a delagation token, in seconds. Default value 1 day.
</description> </description>
</property> </property>
<property> <property>
<name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name> <name>hadoop.kms.authentication.delegation-token.removal-scan-interval.sec</name>
<value>3600</value> <value>3600</value>
<description> <description>
Scan interval to remove expired delegation tokens. Scan interval to remove expired delegation tokens.
</description> </description>
</property> </property>
+---+ +---+
** Using Multiple Instances of KMS Behind a Load-Balancer or VIP
KMS supports multiple KMS instances behind a load-balancer or VIP for
scalability and for HA purposes.
When using multiple KMS instances behind a load-balancer or VIP, requests from
the same user may be handled by different KMS instances.
KMS instances behind a load-balancer or VIP must be specially configured to
work properly as a single logical service.
*** HTTP Kerberos Principals Configuration
TBD
*** HTTP Authentication Signature
KMS uses Hadoop Authentication for HTTP authentication. Hadoop Authentication
issues a signed HTTP Cookie once the client has authenticated successfully.
This HTTP Cookie has an expiration time, after which it will trigger a new
authentication sequence. This is done to avoid triggering the authentication
on every HTTP request of a client.
A KMS instance must verify the HTTP Cookie signatures signed by other KMS
instances. To do this all KMS instances must share the signing secret.
This secret sharing can be done using a Zookeeper service which is configured
in KMS with the following properties in the <<<kms-site.xml>>>:
+---+
<property>
<name>hadoop.kms.authentication.signer.secret.provider</name>
<value>zookeeper</value>
<description>
Indicates how the secret to sign the authentication cookies will be
stored. Options are 'random' (default), 'string' and 'zookeeper'.
If using a setup with multiple KMS instances, 'zookeeper' should be used.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.path</name>
<value>/hadoop-kms/hadoop-auth-signature-secret</value>
<description>
The Zookeeper ZNode path where the KMS instances will store and retrieve
the secret from.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.connection.string</name>
<value>#HOSTNAME#:#PORT#,...</value>
<description>
The Zookeeper connection string, a list of hostnames and port comma
separated.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.auth.type</name>
<value>kerberos</value>
<description>
The Zookeeper authentication type, 'none' or 'sasl' (Kerberos).
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.keytab</name>
<value>/etc/hadoop/conf/kms.keytab</value>
<description>
The absolute path for the Kerberos keytab with the credentials to
connect to Zookeeper.
</description>
</property>
<property>
<name>hadoop.kms.authentication.signer.secret.provider.zookeeper.kerberos.principal</name>
<value>kms/#HOSTNAME#</value>
<description>
The Kerberos service principal used to connect to Zookeeper.
</description>
</property>
+---+
*** Delegation Tokens
TBD
** KMS HTTP REST API ** KMS HTTP REST API
*** Create a Key *** Create a Key

View File

@ -166,7 +166,7 @@ public void start() throws Exception {
File kmsFile = new File(kmsConfDir, "kms-site.xml"); File kmsFile = new File(kmsConfDir, "kms-site.xml");
if (!kmsFile.exists()) { if (!kmsFile.exists()) {
Configuration kms = new Configuration(false); Configuration kms = new Configuration(false);
kms.set("hadoop.security.key.provider.path", kms.set(KMSConfiguration.KEY_PROVIDER_URI,
"jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri()); "jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri());
kms.set("hadoop.kms.authentication.type", "simple"); kms.set("hadoop.kms.authentication.type", "simple");
kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false); kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);

View File

@ -117,13 +117,14 @@ protected void runServer(String keystore, String password, File confDir,
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception { protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
Configuration conf = new Configuration(false); Configuration conf = new Configuration(false);
conf.set("hadoop.security.key.provider.path", conf.set(KMSConfiguration.KEY_PROVIDER_URI,
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(), "kms.keystore").toUri()); "jceks://file@" + new Path(keyStoreDir.getAbsolutePath(), "kms.keystore").toUri());
conf.set("hadoop.kms.authentication.type", "simple"); conf.set("hadoop.kms.authentication.type", "simple");
return conf; return conf;
} }
protected void writeConf(File confDir, Configuration conf) throws Exception { public static void writeConf(File confDir, Configuration conf)
throws Exception {
Writer writer = new FileWriter(new File(confDir, Writer writer = new FileWriter(new File(confDir,
KMSConfiguration.KMS_SITE_XML)); KMSConfiguration.KMS_SITE_XML));
conf.writeXml(writer); conf.writeXml(writer);
@ -139,7 +140,7 @@ protected void writeConf(File confDir, Configuration conf) throws Exception {
writer.close(); writer.close();
} }
protected URI createKMSUri(URL kmsUrl) throws Exception { public static URI createKMSUri(URL kmsUrl) throws Exception {
String str = kmsUrl.toString(); String str = kmsUrl.toString();
str = str.replaceFirst("://", "@"); str = str.replaceFirst("://", "@");
return new URI("kms://" + str); return new URI("kms://" + str);

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms.server;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProvider.Options;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginContext;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URL;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
public class TestKMSWithZK {
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
Configuration conf = new Configuration(false);
conf.set("hadoop.security.key.provider.path",
"jceks://file@" + new Path(keyStoreDir.getAbsolutePath(),
"kms.keystore").toUri());
conf.set("hadoop.kms.authentication.type", "simple");
conf.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
conf.set(KMSACLs.Type.GET_KEYS.getAclConfigKey(), "foo");
return conf;
}
@Test
public void testMultipleKMSInstancesWithZKSigner() throws Exception {
final File testDir = TestKMS.getTestDir();
Configuration conf = createBaseKMSConf(testDir);
TestingServer zkServer = new TestingServer();
zkServer.start();
MiniKMS kms1 = null;
MiniKMS kms2 = null;
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
AuthenticationFilter.SIGNER_SECRET_PROVIDER, "zookeeper");
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
ZKSignerSecretProvider.ZOOKEEPER_CONNECTION_STRING,
zkServer.getConnectString());
conf.set(KMSAuthenticationFilter.CONFIG_PREFIX +
ZKSignerSecretProvider.ZOOKEEPER_PATH, "/secret");
TestKMS.writeConf(testDir, conf);
try {
kms1 = new MiniKMS.Builder()
.setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
kms1.start();
kms2 = new MiniKMS.Builder()
.setKmsConfDir(testDir).setLog4jConfFile("log4j.properties").build();
kms2.start();
final URL url1 = new URL(kms1.getKMSUrl().toExternalForm() +
KMSRESTConstants.SERVICE_VERSION + "/" +
KMSRESTConstants.KEYS_NAMES_RESOURCE);
final URL url2 = new URL(kms2.getKMSUrl().toExternalForm() +
KMSRESTConstants.SERVICE_VERSION + "/" +
KMSRESTConstants.KEYS_NAMES_RESOURCE);
final DelegationTokenAuthenticatedURL.Token token =
new DelegationTokenAuthenticatedURL.Token();
final DelegationTokenAuthenticatedURL aUrl =
new DelegationTokenAuthenticatedURL();
UserGroupInformation ugiFoo = UserGroupInformation.createUserForTesting(
"foo", new String[]{"gfoo"});
UserGroupInformation ugiBar = UserGroupInformation.createUserForTesting(
"bar", new String[]{"gBar"});
ugiFoo.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
HttpURLConnection conn = aUrl.openConnection(url1, token);
Assert.assertEquals(HttpURLConnection.HTTP_OK,
conn.getResponseCode());
return null;
}
});
ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
HttpURLConnection conn = aUrl.openConnection(url2, token);
Assert.assertEquals(HttpURLConnection.HTTP_OK,
conn.getResponseCode());
return null;
}
});
ugiBar.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
final DelegationTokenAuthenticatedURL.Token emptyToken =
new DelegationTokenAuthenticatedURL.Token();
HttpURLConnection conn = aUrl.openConnection(url2, emptyToken);
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN,
conn.getResponseCode());
return null;
}
});
} finally {
if (kms2 != null) {
kms2.stop();
}
if (kms1 != null) {
kms1.stop();
}
zkServer.stop();
}
}
}

View File

@ -544,6 +544,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6705. Create an XAttr that disallows the HDFS admin from accessing a HDFS-6705. Create an XAttr that disallows the HDFS admin from accessing a
file. (clamb via wang) file. (clamb via wang)
HDFS-6843. Create FileStatus isEncrypted() method (clamb via cmccabe)
HDFS-7004. Update KeyProvider instantiation to create by URI. (wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -743,6 +747,14 @@ Release 2.6.0 - UNRELEASED
and TestDFSClientFailover.testDoesntDnsResolveLogicalURI failing on jdk7. and TestDFSClientFailover.testDoesntDnsResolveLogicalURI failing on jdk7.
(Akira Ajisaka via wang) (Akira Ajisaka via wang)
HDFS-6912. SharedFileDescriptorFactory should not allocate sparse files
(cmccabe)
HDFS-7075. hadoop-fuse-dfs fails because it cannot find
JavaKeyStoreProvider$Factory (cmccabe)
HDFS-7078. Fix listEZs to work correctly with snapshots. (wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -595,6 +595,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
// Journal-node related configs. These are read on the JN side. // Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

View File

@ -1794,34 +1794,37 @@ public static void assertAllResultsEqual(Collection<?> objects)
* Creates a new KeyProviderCryptoExtension by wrapping the * Creates a new KeyProviderCryptoExtension by wrapping the
* KeyProvider specified in the given Configuration. * KeyProvider specified in the given Configuration.
* *
* @param conf Configuration specifying a single, non-transient KeyProvider. * @param conf Configuration
* @return new KeyProviderCryptoExtension, or null if no provider was found. * @return new KeyProviderCryptoExtension, or null if no provider was found.
* @throws IOException if the KeyProvider is improperly specified in * @throws IOException if the KeyProvider is improperly specified in
* the Configuration * the Configuration
*/ */
public static KeyProviderCryptoExtension createKeyProviderCryptoExtension( public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
final Configuration conf) throws IOException { final Configuration conf) throws IOException {
final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf); final String providerUriStr =
if (providers == null || providers.size() == 0) { conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, null);
// No provider set in conf
if (providerUriStr == null) {
return null; return null;
} }
if (providers.size() > 1) { final URI providerUri;
StringBuilder builder = new StringBuilder(); try {
builder.append("Found multiple KeyProviders but only one is permitted ["); providerUri = new URI(providerUriStr);
String prefix = " "; } catch (URISyntaxException e) {
for (KeyProvider kp: providers) { throw new IOException(e);
builder.append(prefix + kp.toString());
prefix = ", ";
}
builder.append("]");
throw new IOException(builder.toString());
} }
KeyProviderCryptoExtension provider = KeyProviderCryptoExtension KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
.createKeyProviderCryptoExtension(providers.get(0)); if (keyProvider == null) {
if (provider.isTransient()) { throw new IOException("Could not instantiate KeyProvider from " +
throw new IOException("KeyProvider " + provider.toString() DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" +
providerUriStr +"'");
}
if (keyProvider.isTransient()) {
throw new IOException("KeyProvider " + keyProvider.toString()
+ " was found but it is a transient provider."); + " was found but it is a transient provider.");
} }
return provider; KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
.createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider;
} }
} }

View File

@ -21,39 +21,46 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
/** /**
* HDFS permission subclass used to indicate an ACL is present. The ACL bit is * HDFS permission subclass used to indicate an ACL is present and/or that the
* not visible directly to users of {@link FsPermission} serialization. This is * underlying file/dir is encrypted. The ACL/encrypted bits are not visible
* directly to users of {@link FsPermission} serialization. This is
* done for backwards compatibility in case any existing clients assume the * done for backwards compatibility in case any existing clients assume the
* value of FsPermission is in a particular range. * value of FsPermission is in a particular range.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class FsAclPermission extends FsPermission { public class FsPermissionExtension extends FsPermission {
private final static short ACL_BIT = 1 << 12; private final static short ACL_BIT = 1 << 12;
private final static short ENCRYPTED_BIT = 1 << 13;
private final boolean aclBit; private final boolean aclBit;
private final boolean encryptedBit;
/** /**
* Constructs a new FsAclPermission based on the given FsPermission. * Constructs a new FsPermissionExtension based on the given FsPermission.
* *
* @param perm FsPermission containing permission bits * @param perm FsPermission containing permission bits
*/ */
public FsAclPermission(FsPermission perm) { public FsPermissionExtension(FsPermission perm, boolean hasAcl,
boolean isEncrypted) {
super(perm.toShort()); super(perm.toShort());
aclBit = true; aclBit = hasAcl;
encryptedBit = isEncrypted;
} }
/** /**
* Creates a new FsAclPermission by calling the base class constructor. * Creates a new FsPermissionExtension by calling the base class constructor.
* *
* @param perm short containing permission bits * @param perm short containing permission bits
*/ */
public FsAclPermission(short perm) { public FsPermissionExtension(short perm) {
super(perm); super(perm);
aclBit = (perm & ACL_BIT) != 0; aclBit = (perm & ACL_BIT) != 0;
encryptedBit = (perm & ENCRYPTED_BIT) != 0;
} }
@Override @Override
public short toExtendedShort() { public short toExtendedShort() {
return (short)(toShort() | (aclBit ? ACL_BIT : 0)); return (short)(toShort() |
(aclBit ? ACL_BIT : 0) | (encryptedBit ? ENCRYPTED_BIT : 0));
} }
@Override @Override
@ -61,6 +68,11 @@ public boolean getAclBit() {
return aclBit; return aclBit;
} }
@Override
public boolean getEncryptedBit() {
return encryptedBit;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
// This intentionally delegates to the base class. This is only overridden // This intentionally delegates to the base class. This is only overridden

View File

@ -67,7 +67,7 @@
import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.FsAclPermission; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -1282,7 +1282,7 @@ public static FsPermissionProto convert(FsPermission p) {
} }
public static FsPermission convert(FsPermissionProto p) { public static FsPermission convert(FsPermissionProto p) {
return new FsAclPermission((short)p.getPerm()); return new FsPermissionExtension((short)p.getPerm());
} }

View File

@ -312,7 +312,22 @@ BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
int count = 0; int count = 0;
for (EncryptionZoneInt ezi : tailMap.values()) { for (EncryptionZoneInt ezi : tailMap.values()) {
zones.add(new EncryptionZone(getFullPathName(ezi), /*
Skip EZs that are only present in snapshots. Re-resolve the path to
see if the path's current inode ID matches EZ map's INode ID.
INode#getFullPathName simply calls getParent recursively, so will return
the INode's parents at the time it was snapshotted. It will not
contain a reference INode.
*/
final String pathName = getFullPathName(ezi);
INodesInPath iip = dir.getINodesInPath(pathName, false);
INode lastINode = iip.getLastINode();
if (lastINode == null || lastINode.getId() != ezi.getINodeId()) {
continue;
}
// Add the EZ to the result list
zones.add(new EncryptionZone(pathName,
ezi.getKeyName(), ezi.getINodeId())); ezi.getKeyName(), ezi.getINodeId()));
count++; count++;
if (count >= numResponses) { if (count >= numResponses) {

View File

@ -64,7 +64,7 @@
import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException; import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException; import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.FsAclPermission; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
@ -2372,16 +2372,24 @@ HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
long size = 0; // length is zero for directories long size = 0; // length is zero for directories
short replication = 0; short replication = 0;
long blocksize = 0; long blocksize = 0;
final boolean isEncrypted;
final FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot);
if (node.isFile()) { if (node.isFile()) {
final INodeFile fileNode = node.asFile(); final INodeFile fileNode = node.asFile();
size = fileNode.computeFileSize(snapshot); size = fileNode.computeFileSize(snapshot);
replication = fileNode.getFileReplication(snapshot); replication = fileNode.getFileReplication(snapshot);
blocksize = fileNode.getPreferredBlockSize(); blocksize = fileNode.getPreferredBlockSize();
isEncrypted = (feInfo != null) ||
(isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
} else {
isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
} }
int childrenNum = node.isDirectory() ? int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0; node.asDirectory().getChildrenNum(snapshot) : 0;
FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot);
return new HdfsFileStatus( return new HdfsFileStatus(
size, size,
@ -2390,7 +2398,7 @@ HdfsFileStatus createFileStatus(byte[] path, INode node, byte storagePolicy,
blocksize, blocksize,
node.getModificationTime(snapshot), node.getModificationTime(snapshot),
node.getAccessTime(snapshot), node.getAccessTime(snapshot),
getPermissionForFileStatus(node, snapshot), getPermissionForFileStatus(node, snapshot, isEncrypted),
node.getUserName(snapshot), node.getUserName(snapshot),
node.getGroupName(snapshot), node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null, node.isSymlink() ? node.asSymlink().getSymlink() : null,
@ -2411,6 +2419,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
short replication = 0; short replication = 0;
long blocksize = 0; long blocksize = 0;
LocatedBlocks loc = null; LocatedBlocks loc = null;
final boolean isEncrypted;
final FileEncryptionInfo feInfo = isRawPath ? null : final FileEncryptionInfo feInfo = isRawPath ? null :
getFileEncryptionInfo(node, snapshot); getFileEncryptionInfo(node, snapshot);
if (node.isFile()) { if (node.isFile()) {
@ -2430,6 +2439,10 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
if (loc == null) { if (loc == null) {
loc = new LocatedBlocks(); loc = new LocatedBlocks();
} }
isEncrypted = (feInfo != null) ||
(isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
} else {
isEncrypted = isInAnEZ(INodesInPath.fromINode(node));
} }
int childrenNum = node.isDirectory() ? int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0; node.asDirectory().getChildrenNum(snapshot) : 0;
@ -2438,7 +2451,7 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
new HdfsLocatedFileStatus(size, node.isDirectory(), replication, new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
blocksize, node.getModificationTime(snapshot), blocksize, node.getModificationTime(snapshot),
node.getAccessTime(snapshot), node.getAccessTime(snapshot),
getPermissionForFileStatus(node, snapshot), getPermissionForFileStatus(node, snapshot, isEncrypted),
node.getUserName(snapshot), node.getGroupName(snapshot), node.getUserName(snapshot), node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null, path, node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
node.getId(), loc, childrenNum, feInfo, storagePolicy); node.getId(), loc, childrenNum, feInfo, storagePolicy);
@ -2454,17 +2467,21 @@ private HdfsLocatedFileStatus createLocatedFileStatus(byte[] path, INode node,
/** /**
* Returns an inode's FsPermission for use in an outbound FileStatus. If the * Returns an inode's FsPermission for use in an outbound FileStatus. If the
* inode has an ACL, then this method will convert to a FsAclPermission. * inode has an ACL or is for an encrypted file/dir, then this method will
* return an FsPermissionExtension.
* *
* @param node INode to check * @param node INode to check
* @param snapshot int snapshot ID * @param snapshot int snapshot ID
* @param isEncrypted boolean true if the file/dir is encrypted
* @return FsPermission from inode, with ACL bit on if the inode has an ACL * @return FsPermission from inode, with ACL bit on if the inode has an ACL
* and encrypted bit on if it represents an encrypted file/dir.
*/ */
private static FsPermission getPermissionForFileStatus(INode node, private static FsPermission getPermissionForFileStatus(INode node,
int snapshot) { int snapshot, boolean isEncrypted) {
FsPermission perm = node.getFsPermission(snapshot); FsPermission perm = node.getFsPermission(snapshot);
if (node.getAclFeature(snapshot) != null) { boolean hasAcl = node.getAclFeature(snapshot) != null;
perm = new FsAclPermission(perm); if (hasAcl || isEncrypted) {
perm = new FsPermissionExtension(perm, hasAcl, isEncrypted);
} }
return perm; return perm;
} }

View File

@ -181,9 +181,16 @@ private static String toString(final FsPermission permission) {
} }
/** Convert a string to a FsPermission object. */ /** Convert a string to a FsPermission object. */
private static FsPermission toFsPermission(final String s, Boolean aclBit) { private static FsPermission toFsPermission(final String s, Boolean aclBit,
Boolean encBit) {
FsPermission perm = new FsPermission(Short.parseShort(s, 8)); FsPermission perm = new FsPermission(Short.parseShort(s, 8));
return (aclBit != null && aclBit) ? new FsAclPermission(perm) : perm; final boolean aBit = (aclBit != null) ? aclBit : false;
final boolean eBit = (encBit != null) ? encBit : false;
if (aBit || eBit) {
return new FsPermissionExtension(perm, aBit, eBit);
} else {
return perm;
}
} }
static enum PathType { static enum PathType {
@ -215,6 +222,9 @@ public static String toJsonString(final HdfsFileStatus status,
if (perm.getAclBit()) { if (perm.getAclBit()) {
m.put("aclBit", true); m.put("aclBit", true);
} }
if (perm.getEncryptedBit()) {
m.put("encBit", true);
}
m.put("accessTime", status.getAccessTime()); m.put("accessTime", status.getAccessTime());
m.put("modificationTime", status.getModificationTime()); m.put("modificationTime", status.getModificationTime());
m.put("blockSize", status.getBlockSize()); m.put("blockSize", status.getBlockSize());
@ -242,7 +252,7 @@ public static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includes
final String owner = (String) m.get("owner"); final String owner = (String) m.get("owner");
final String group = (String) m.get("group"); final String group = (String) m.get("group");
final FsPermission permission = toFsPermission((String) m.get("permission"), final FsPermission permission = toFsPermission((String) m.get("permission"),
(Boolean)m.get("aclBit")); (Boolean)m.get("aclBit"), (Boolean)m.get("encBit"));
final long aTime = (Long) m.get("accessTime"); final long aTime = (Long) m.get("accessTime");
final long mTime = (Long) m.get("modificationTime"); final long mTime = (Long) m.get("modificationTime");
final long blockSize = (Long) m.get("blockSize"); final long blockSize = (Long) m.get("blockSize");

View File

@ -2138,4 +2138,12 @@
</description> </description>
</property> </property>
<property>
<name>dfs.encryption.key.provider.uri</name>
<description>
The KeyProvider to use when interacting with encryption keys used
when reading and writing to an encryption zone.
</description>
</property>
</configuration> </configuration>

View File

@ -85,6 +85,12 @@ Transparent Encryption in HDFS
A necessary prerequisite is an instance of the KMS, as well as a backing key store for the KMS. A necessary prerequisite is an instance of the KMS, as well as a backing key store for the KMS.
See the {{{../../hadoop-kms/index.html}KMS documentation}} for more information. See the {{{../../hadoop-kms/index.html}KMS documentation}} for more information.
** Configuring the cluster KeyProvider
*** dfs.encryption.key.provider.uri
The KeyProvider to use when interacting with encryption keys used when reading and writing to an encryption zone.
** Selecting an encryption algorithm and codec ** Selecting an encryption algorithm and codec
*** hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE *** hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE

View File

@ -66,7 +66,7 @@ public void setUp() throws Exception {
tmpDir = new File(System.getProperty("test.build.data", "target"), tmpDir = new File(System.getProperty("test.build.data", "target"),
UUID.randomUUID().toString()).getAbsoluteFile(); UUID.randomUUID().toString()).getAbsoluteFile();
final Path jksPath = new Path(tmpDir.toString(), "test.jks"); final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -25,7 +25,9 @@
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.io.StringReader; import java.io.StringReader;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -47,6 +49,7 @@
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestWrapper; import org.apache.hadoop.fs.FileContextTestWrapper;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FileSystemTestWrapper; import org.apache.hadoop.fs.FileSystemTestWrapper;
@ -124,7 +127,7 @@ public void setup() throws Exception {
// Set up java key store // Set up java key store
String testRoot = fsHelper.getTestRootDir(); String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile(); testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, getKeyProviderURI()); conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing // Lower the batch size for testing
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES, conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,
@ -669,7 +672,8 @@ public void testCipherSuiteNegotiation() throws Exception {
// Check KeyProvider state // Check KeyProvider state
// Flushing the KP on the NN, since it caches, and init a test one // Flushing the KP on the NN, since it caches, and init a test one
cluster.getNamesystem().getProvider().flush(); cluster.getNamesystem().getProvider().flush();
KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0); KeyProvider provider = KeyProviderFactory
.get(new URI(conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI)), conf);
List<String> keys = provider.getKeys(); List<String> keys = provider.getKeys();
assertEquals("Expected NN to have created one key per zone", 1, assertEquals("Expected NN to have created one key per zone", 1,
keys.size()); keys.size());
@ -693,7 +697,7 @@ public void testCipherSuiteNegotiation() throws Exception {
public void testCreateEZWithNoProvider() throws Exception { public void testCreateEZWithNoProvider() throws Exception {
// Unset the key provider and make sure EZ ops don't work // Unset the key provider and make sure EZ ops don't work
final Configuration clusterConf = cluster.getConfiguration(0); final Configuration clusterConf = cluster.getConfiguration(0);
clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ""); clusterConf.unset(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI);
cluster.restartNameNode(true); cluster.restartNameNode(true);
cluster.waitActive(); cluster.waitActive();
final Path zone1 = new Path("/zone1"); final Path zone1 = new Path("/zone1");
@ -705,13 +709,100 @@ public void testCreateEZWithNoProvider() throws Exception {
assertExceptionContains("since no key provider is available", e); assertExceptionContains("since no key provider is available", e);
} }
final Path jksPath = new Path(testRootDir.toString(), "test.jks"); final Path jksPath = new Path(testRootDir.toString(), "test.jks");
clusterConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, clusterConf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri() JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
); );
// Try listing EZs as well // Try listing EZs as well
assertNumZones(0); assertNumZones(0);
} }
@Test(timeout = 120000)
public void testIsEncryptedMethod() throws Exception {
doTestIsEncryptedMethod(new Path("/"));
doTestIsEncryptedMethod(new Path("/.reserved/raw"));
}
private void doTestIsEncryptedMethod(Path prefix) throws Exception {
try {
dTIEM(prefix);
} finally {
for (FileStatus s : fsWrapper.listStatus(prefix)) {
fsWrapper.delete(s.getPath(), true);
}
}
}
private void dTIEM(Path prefix) throws Exception {
final HdfsAdmin dfsAdmin =
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
// Create an unencrypted file to check isEncrypted returns false
final Path baseFile = new Path(prefix, "base");
fsWrapper.createFile(baseFile);
FileStatus stat = fsWrapper.getFileStatus(baseFile);
assertFalse("Expected isEncrypted to return false for " + baseFile,
stat.isEncrypted());
// Create an encrypted file to check isEncrypted returns true
final Path zone = new Path(prefix, "zone");
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
dfsAdmin.createEncryptionZone(zone, TEST_KEY);
final Path encFile = new Path(zone, "encfile");
fsWrapper.createFile(encFile);
stat = fsWrapper.getFileStatus(encFile);
assertTrue("Expected isEncrypted to return true for enc file" + encFile,
stat.isEncrypted());
// check that it returns true for an ez root
stat = fsWrapper.getFileStatus(zone);
assertTrue("Expected isEncrypted to return true for ezroot",
stat.isEncrypted());
// check that it returns true for a dir in the ez
final Path zoneSubdir = new Path(zone, "subdir");
fsWrapper.mkdir(zoneSubdir, FsPermission.getDirDefault(), true);
stat = fsWrapper.getFileStatus(zoneSubdir);
assertTrue(
"Expected isEncrypted to return true for ez subdir " + zoneSubdir,
stat.isEncrypted());
// check that it returns false for a non ez dir
final Path nonEzDirPath = new Path(prefix, "nonzone");
fsWrapper.mkdir(nonEzDirPath, FsPermission.getDirDefault(), true);
stat = fsWrapper.getFileStatus(nonEzDirPath);
assertFalse(
"Expected isEncrypted to return false for directory " + nonEzDirPath,
stat.isEncrypted());
// check that it returns true for listings within an ez
FileStatus[] statuses = fsWrapper.listStatus(zone);
for (FileStatus s : statuses) {
assertTrue("Expected isEncrypted to return true for ez stat " + zone,
s.isEncrypted());
}
statuses = fsWrapper.listStatus(encFile);
for (FileStatus s : statuses) {
assertTrue(
"Expected isEncrypted to return true for ez file stat " + encFile,
s.isEncrypted());
}
// check that it returns false for listings outside an ez
statuses = fsWrapper.listStatus(nonEzDirPath);
for (FileStatus s : statuses) {
assertFalse(
"Expected isEncrypted to return false for nonez stat " + nonEzDirPath,
s.isEncrypted());
}
statuses = fsWrapper.listStatus(baseFile);
for (FileStatus s : statuses) {
assertFalse(
"Expected isEncrypted to return false for non ez stat " + baseFile,
s.isEncrypted());
}
}
private class MyInjector extends EncryptionFaultInjector { private class MyInjector extends EncryptionFaultInjector {
int generateCount; int generateCount;
CountDownLatch ready; CountDownLatch ready;
@ -940,6 +1031,9 @@ public void testFsckOnEncryptionZones() throws Exception {
*/ */
@Test(timeout = 60000) @Test(timeout = 60000)
public void testSnapshotsOnEncryptionZones() throws Exception { public void testSnapshotsOnEncryptionZones() throws Exception {
final String TEST_KEY2 = "testkey2";
DFSTestUtil.createKey(TEST_KEY2, cluster, conf);
final int len = 8196; final int len = 8196;
final Path zoneParent = new Path("/zones"); final Path zoneParent = new Path("/zones");
final Path zone = new Path(zoneParent, "zone"); final Path zone = new Path(zoneParent, "zone");
@ -954,7 +1048,8 @@ public void testSnapshotsOnEncryptionZones() throws Exception {
assertEquals("Got unexpected ez path", zone.toString(), assertEquals("Got unexpected ez path", zone.toString(),
dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString()); dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());
// Now delete the encryption zone, recreate the dir, and take another snapshot // Now delete the encryption zone, recreate the dir, and take another
// snapshot
fsWrapper.delete(zone, true); fsWrapper.delete(zone, true);
fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true); fsWrapper.mkdir(zone, FsPermission.getDirDefault(), true);
final Path snap2 = fs.createSnapshot(zoneParent); final Path snap2 = fs.createSnapshot(zoneParent);
@ -963,11 +1058,35 @@ public void testSnapshotsOnEncryptionZones() throws Exception {
dfsAdmin.getEncryptionZoneForPath(snap2Zone)); dfsAdmin.getEncryptionZoneForPath(snap2Zone));
// Create the encryption zone again // Create the encryption zone again
dfsAdmin.createEncryptionZone(zone, TEST_KEY); dfsAdmin.createEncryptionZone(zone, TEST_KEY2);
final Path snap3 = fs.createSnapshot(zoneParent); final Path snap3 = fs.createSnapshot(zoneParent);
final Path snap3Zone = new Path(snap3, zone.getName()); final Path snap3Zone = new Path(snap3, zone.getName());
// Check that snap3's EZ has the correct settings
EncryptionZone ezSnap3 = dfsAdmin.getEncryptionZoneForPath(snap3Zone);
assertEquals("Got unexpected ez path", zone.toString(), assertEquals("Got unexpected ez path", zone.toString(),
dfsAdmin.getEncryptionZoneForPath(snap3Zone).getPath().toString()); ezSnap3.getPath().toString());
assertEquals("Unexpected ez key", TEST_KEY2, ezSnap3.getKeyName());
// Check that older snapshots still have the old EZ settings
EncryptionZone ezSnap1 = dfsAdmin.getEncryptionZoneForPath(snap1Zone);
assertEquals("Got unexpected ez path", zone.toString(),
ezSnap1.getPath().toString());
assertEquals("Unexpected ez key", TEST_KEY, ezSnap1.getKeyName());
// Check that listEZs only shows the current filesystem state
ArrayList<EncryptionZone> listZones = Lists.newArrayList();
RemoteIterator<EncryptionZone> it = dfsAdmin.listEncryptionZones();
while (it.hasNext()) {
listZones.add(it.next());
}
for (EncryptionZone z: listZones) {
System.out.println(z);
}
assertEquals("Did not expect additional encryption zones!", 1,
listZones.size());
EncryptionZone listZone = listZones.get(0);
assertEquals("Got unexpected ez path", zone.toString(),
listZone.getPath().toString());
assertEquals("Unexpected ez key", TEST_KEY2, listZone.getKeyName());
// Verify contents of the snapshotted file // Verify contents of the snapshotted file
final Path snapshottedZoneFile = new Path( final Path snapshottedZoneFile = new Path(
@ -975,7 +1094,8 @@ public void testSnapshotsOnEncryptionZones() throws Exception {
assertEquals("Contents of snapshotted file have changed unexpectedly", assertEquals("Contents of snapshotted file have changed unexpectedly",
contents, DFSTestUtil.readFile(fs, snapshottedZoneFile)); contents, DFSTestUtil.readFile(fs, snapshottedZoneFile));
// Now delete the snapshots out of order and verify the zones are still correct // Now delete the snapshots out of order and verify the zones are still
// correct
fs.deleteSnapshot(zoneParent, snap2.getName()); fs.deleteSnapshot(zoneParent, snap2.getName());
assertEquals("Got unexpected ez path", zone.toString(), assertEquals("Got unexpected ez path", zone.toString(),
dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString()); dfsAdmin.getEncryptionZoneForPath(snap1Zone).getPath().toString());

View File

@ -20,7 +20,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -60,7 +59,7 @@ public void setupCluster() throws Exception {
fsHelper = new FileSystemTestHelper(); fsHelper = new FileSystemTestHelper();
String testRoot = fsHelper.getTestRootDir(); String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile(); testRootDir = new File(testRoot).getAbsoluteFile();
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks" JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir + "/test.jks"
); );

View File

@ -24,7 +24,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestWrapper; import org.apache.hadoop.fs.FileContextTestWrapper;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -70,7 +69,7 @@ public void setup() throws Exception {
String testRoot = fsHelper.getTestRootDir(); String testRoot = fsHelper.getTestRootDir();
File testRootDir = new File(testRoot).getAbsoluteFile(); File testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks"); final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri() JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
); );
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();

View File

@ -39,7 +39,7 @@
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.FsAclPermission; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -822,7 +822,8 @@ public void testSetPermissionCannotSetAclBit() throws IOException {
fs.setPermission(path, FsPermission.createImmutable((short)0700)); fs.setPermission(path, FsPermission.createImmutable((short)0700));
assertPermission((short)0700); assertPermission((short)0700);
fs.setPermission(path, fs.setPermission(path,
new FsAclPermission(FsPermission.createImmutable((short)0755))); new FsPermissionExtension(FsPermission.
createImmutable((short)0755), true, true));
INode inode = cluster.getNamesystem().getFSDirectory().getNode( INode inode = cluster.getNamesystem().getFSDirectory().getNode(
path.toUri().getPath(), false); path.toUri().getPath(), false);
assertNotNull(inode); assertNotNull(inode);

View File

@ -377,6 +377,12 @@ Release 2.6.0 - UNRELEASED
YARN-2529. Generic history service RPC interface doesn't work when service YARN-2529. Generic history service RPC interface doesn't work when service
authorization is enabled. (Zhijie Shen via jianhe) authorization is enabled. (Zhijie Shen via jianhe)
YARN-2558. Updated ContainerTokenIdentifier#read/write to use
ContainerId#getContainerId. (Tsuyoshi OZAWA via jianhe)
YARN-2559. Fixed NPE in SystemMetricsPublisher when retrieving
FinalApplicationStatus. (Zhijie Shen via jianhe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -128,7 +128,7 @@ public void write(DataOutput out) throws IOException {
out.writeLong(applicationId.getClusterTimestamp()); out.writeLong(applicationId.getClusterTimestamp());
out.writeInt(applicationId.getId()); out.writeInt(applicationId.getId());
out.writeInt(applicationAttemptId.getAttemptId()); out.writeInt(applicationAttemptId.getAttemptId());
out.writeInt(this.containerId.getId()); out.writeLong(this.containerId.getContainerId());
out.writeUTF(this.nmHostAddr); out.writeUTF(this.nmHostAddr);
out.writeUTF(this.appSubmitter); out.writeUTF(this.appSubmitter);
out.writeInt(this.resource.getMemory()); out.writeInt(this.resource.getMemory());
@ -147,7 +147,7 @@ public void readFields(DataInput in) throws IOException {
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, in.readInt()); ApplicationAttemptId.newInstance(applicationId, in.readInt());
this.containerId = this.containerId =
ContainerId.newInstance(applicationAttemptId, in.readInt()); ContainerId.newInstance(applicationAttemptId, in.readLong());
this.nmHostAddr = in.readUTF(); this.nmHostAddr = in.readUTF();
this.appSubmitter = in.readUTF(); this.appSubmitter = in.readUTF();
int memory = in.readInt(); int memory = in.readInt();

View File

@ -160,7 +160,7 @@ public void appAttemptRegistered(RMAppAttempt appAttempt,
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt, public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState state, long finishedTime) { RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
if (publishSystemMetrics) { if (publishSystemMetrics) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent( new AppAttemptFinishedEvent(
@ -168,8 +168,10 @@ public void appAttemptFinished(RMAppAttempt appAttempt,
appAttempt.getTrackingUrl(), appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(), appAttempt.getOriginalTrackingUrl(),
appAttempt.getDiagnostics(), appAttempt.getDiagnostics(),
appAttempt.getFinalApplicationStatus(), // app will get the final status from app attempt, or create one
RMServerUtils.createApplicationAttemptState(state), // based on app state if it doesn't exist
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationAttemptState(appAttemtpState),
finishedTime)); finishedTime));
} }
} }

View File

@ -1159,8 +1159,10 @@ public void transition(RMAppAttemptImpl appAttempt,
appAttempt.rmContext.getRMApplicationHistoryWriter() appAttempt.rmContext.getRMApplicationHistoryWriter()
.applicationAttemptFinished(appAttempt, finalAttemptState); .applicationAttemptFinished(appAttempt, finalAttemptState);
appAttempt.rmContext.getSystemMetricsPublisher() appAttempt.rmContext.getSystemMetricsPublisher()
.appAttemptFinished( .appAttemptFinished(appAttempt, finalAttemptState,
appAttempt, finalAttemptState, System.currentTimeMillis()); appAttempt.rmContext.getRMApps().get(
appAttempt.applicationAttemptId.getApplicationId()),
System.currentTimeMillis());
} }
} }

View File

@ -174,7 +174,9 @@ public void testPublishAppAttemptMetrics() throws Exception {
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, RMApp app = mock(RMApp.class);
when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED);
metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app,
Integer.MAX_VALUE + 2L); Integer.MAX_VALUE + 2L);
TimelineEntity entity = null; TimelineEntity entity = null;
do { do {
@ -222,7 +224,7 @@ public void testPublishAppAttemptMetrics() throws Exception {
event.getEventInfo().get( event.getEventInfo().get(
AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)); AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
Assert.assertEquals( Assert.assertEquals(
appAttempt.getFinalApplicationStatus().toString(), FinalApplicationStatus.UNDEFINED.toString(),
event.getEventInfo().get( event.getEventInfo().get(
AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO)); AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals( Assert.assertEquals(
@ -340,8 +342,6 @@ private static RMAppAttempt createRMAppAttempt(
when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
when(appAttempt.getOriginalTrackingUrl()).thenReturn( when(appAttempt.getOriginalTrackingUrl()).thenReturn(
"test original tracking url"); "test original tracking url");
when(appAttempt.getFinalApplicationStatus()).thenReturn(
FinalApplicationStatus.UNDEFINED);
return appAttempt; return appAttempt;
} }

View File

@ -76,6 +76,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
@ -92,7 +93,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -289,7 +289,6 @@ public void setUp() throws Exception {
Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler(); Mockito.doReturn(resourceScheduler).when(spyRMContext).getScheduler();
final String user = MockApps.newUserName();
final String queue = MockApps.newQueue(); final String queue = MockApps.newQueue();
submissionContext = mock(ApplicationSubmissionContext.class); submissionContext = mock(ApplicationSubmissionContext.class);
when(submissionContext.getQueue()).thenReturn(queue); when(submissionContext.getQueue()).thenReturn(queue);
@ -1385,7 +1384,7 @@ private void verifyApplicationAttemptFinished(RMAppAttemptState state) {
finalState = finalState =
ArgumentCaptor.forClass(RMAppAttemptState.class); ArgumentCaptor.forClass(RMAppAttemptState.class);
verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(), verify(publisher).appAttemptFinished(any(RMAppAttempt.class), finalState.capture(),
anyLong()); any(RMApp.class), anyLong());
Assert.assertEquals(state, finalState.getValue()); Assert.assertEquals(state, finalState.getValue());
} }

View File

@ -28,6 +28,9 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.LinkedList;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -158,6 +161,25 @@ public void testContainerManager() throws Exception {
} }
} }
} }
@Test (timeout = 500000)
public void testContainerManagerWithEpoch() throws Exception {
try {
yarnCluster = new MiniYARNCluster(TestContainerManagerSecurity.class
.getName(), 1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
// Testing for container token tampering
testContainerTokenWithEpoch(conf);
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
yarnCluster = null;
}
}
}
private void testNMTokens(Configuration conf) throws Exception { private void testNMTokens(Configuration conf) throws Exception {
NMTokenSecretManagerInRM nmTokenSecretManagerRM = NMTokenSecretManagerInRM nmTokenSecretManagerRM =
@ -603,4 +625,74 @@ private void testContainerToken(Configuration conf) throws IOException,
Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId,
containerToken, nmToken, true).contains(sb.toString())); containerToken, nmToken, true).contains(sb.toString()));
} }
/**
* This tests whether a containerId is serialized/deserialized with epoch.
*
* @throws IOException
* @throws InterruptedException
* @throws YarnException
*/
private void testContainerTokenWithEpoch(Configuration conf)
throws IOException, InterruptedException, YarnException {
LOG.info("Running test for serializing/deserializing containerIds");
NMTokenSecretManagerInRM nmTokenSecretManagerInRM =
yarnCluster.getResourceManager().getRMContext()
.getNMTokenSecretManager();
ApplicationId appId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
ContainerId cId = ContainerId.newInstance(appAttemptId, (5L << 40) | 3L);
NodeManager nm = yarnCluster.getNodeManager(0);
NMTokenSecretManagerInNM nmTokenSecretManagerInNM =
nm.getNMContext().getNMTokenSecretManager();
String user = "test";
waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm);
NodeId nodeId = nm.getNMContext().getNodeId();
// Both id should be equal.
Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(),
nmTokenSecretManagerInRM.getCurrentKey().getKeyId());
// Creating a normal Container Token
RMContainerTokenSecretManager containerTokenSecretManager =
yarnCluster.getResourceManager().getRMContext().
getContainerTokenSecretManager();
Resource r = Resource.newInstance(1230, 2);
Token containerToken =
containerTokenSecretManager.createContainerToken(cId, nodeId, user, r,
Priority.newInstance(0), 0);
ByteArrayDataInput input = ByteStreams.newDataInput(
containerToken.getIdentifier().array());
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier();
containerTokenIdentifier.readFields(input);
Assert.assertEquals(cId, containerTokenIdentifier.getContainerID());
Assert.assertEquals(
cId.toString(), containerTokenIdentifier.getContainerID().toString());
Token nmToken =
nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user);
YarnRPC rpc = YarnRPC.create(conf);
testStartContainer(rpc, appAttemptId, nodeId, containerToken, nmToken,
false);
List<ContainerId> containerIds = new LinkedList<ContainerId>();
containerIds.add(cId);
ContainerManagementProtocol proxy
= getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user);
GetContainerStatusesResponse res = proxy.getContainerStatuses(
GetContainerStatusesRequest.newInstance(containerIds));
Assert.assertNotNull(res.getContainerStatuses().get(0));
Assert.assertEquals(
cId, res.getContainerStatuses().get(0).getContainerId());
Assert.assertEquals(cId.toString(),
res.getContainerStatuses().get(0).getContainerId().toString());
}
} }