HDFS-12396. Webhdfs file system should get delegation token from kms provider. Contributed by Rushabh S Shah.
This commit is contained in:
parent
37ca416950
commit
404eab4dc0
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* File systems that support Encryption Zones have to implement this interface.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface KeyProviderTokenIssuer {
|
||||
|
||||
KeyProvider getKeyProvider() throws IOException;
|
||||
|
||||
URI getKeyProviderUri() throws IOException;
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.crypto.key;
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
|
||||
@ -58,18 +57,15 @@
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
@ -171,7 +167,6 @@
|
||||
import org.apache.hadoop.net.DNS;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
@ -207,7 +202,6 @@
|
||||
public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||
DataEncryptionKeyFactory {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
|
||||
private static final String DFS_KMS_PREFIX = "dfs-kms-";
|
||||
|
||||
private final Configuration conf;
|
||||
private final Tracer tracer;
|
||||
@ -936,55 +930,6 @@ private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the crypto protocol version from the provided FileEncryptionInfo,
|
||||
* checking to see if this version is supported by.
|
||||
*
|
||||
* @param feInfo FileEncryptionInfo
|
||||
* @return CryptoProtocolVersion from the feInfo
|
||||
* @throws IOException if the protocol version is unsupported.
|
||||
*/
|
||||
private static CryptoProtocolVersion getCryptoProtocolVersion(
|
||||
FileEncryptionInfo feInfo) throws IOException {
|
||||
final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
|
||||
if (!CryptoProtocolVersion.supports(version)) {
|
||||
throw new IOException("Client does not support specified " +
|
||||
"CryptoProtocolVersion " + version.getDescription() + " version " +
|
||||
"number" + version.getVersion());
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
|
||||
* and the available CryptoCodecs configured in the Configuration.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param feInfo FileEncryptionInfo
|
||||
* @return CryptoCodec
|
||||
* @throws IOException if no suitable CryptoCodec for the CipherSuite is
|
||||
* available.
|
||||
*/
|
||||
private static CryptoCodec getCryptoCodec(Configuration conf,
|
||||
FileEncryptionInfo feInfo) throws IOException {
|
||||
final CipherSuite suite = feInfo.getCipherSuite();
|
||||
if (suite.equals(CipherSuite.UNKNOWN)) {
|
||||
throw new IOException("NameNode specified unknown CipherSuite with ID "
|
||||
+ suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
|
||||
}
|
||||
final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
|
||||
if (codec == null) {
|
||||
throw new UnknownCipherSuiteException(
|
||||
"No configuration found for the cipher suite "
|
||||
+ suite.getConfigSuffix() + " prefixed with "
|
||||
+ HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
|
||||
+ ". Please see the example configuration "
|
||||
+ "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
|
||||
+ "at core-default.xml for details.");
|
||||
}
|
||||
return codec;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the stream in a CryptoInputStream if the underlying file is
|
||||
* encrypted.
|
||||
@ -995,8 +940,8 @@ public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
|
||||
if (feInfo != null) {
|
||||
// File is encrypted, wrap the stream in a crypto stream.
|
||||
// Currently only one version, so no special logic based on the version #
|
||||
getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = getCryptoCodec(conf, feInfo);
|
||||
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
|
||||
final KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||
final CryptoInputStream cryptoIn =
|
||||
new CryptoInputStream(dfsis, codec, decrypted.getMaterial(),
|
||||
@ -1027,8 +972,8 @@ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
|
||||
if (feInfo != null) {
|
||||
// File is encrypted, wrap the stream in a crypto stream.
|
||||
// Currently only one version, so no special logic based on the version #
|
||||
getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = getCryptoCodec(conf, feInfo);
|
||||
HdfsKMSUtil.getCryptoProtocolVersion(feInfo);
|
||||
final CryptoCodec codec = HdfsKMSUtil.getCryptoCodec(conf, feInfo);
|
||||
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||
final CryptoOutputStream cryptoOut =
|
||||
new CryptoOutputStream(dfsos, codec,
|
||||
@ -2983,51 +2928,9 @@ DFSHedgedReadMetrics getHedgedReadMetrics() {
|
||||
return HEDGED_READ_METRIC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a key to map namenode uri to key provider uri.
|
||||
* Tasks will lookup this key to find key Provider.
|
||||
*/
|
||||
public Text getKeyProviderMapKey() {
|
||||
return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
|
||||
+"://" + namenodeUri.getAuthority());
|
||||
}
|
||||
|
||||
/**
|
||||
* The key provider uri is searched in the following order.
|
||||
* 1. If there is a mapping in Credential's secrets map for namenode uri.
|
||||
* 2. From namenode getServerDefaults rpc.
|
||||
* 3. Finally fallback to local conf.
|
||||
* @return keyProviderUri if found from either of above 3 cases,
|
||||
* null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
URI getKeyProviderUri() throws IOException {
|
||||
URI keyProviderUri = null;
|
||||
// Lookup the secret in credentials object for namenodeuri.
|
||||
Credentials credentials = ugi.getCredentials();
|
||||
byte[] keyProviderUriBytes = credentials.getSecretKey(getKeyProviderMapKey());
|
||||
if(keyProviderUriBytes != null) {
|
||||
keyProviderUri =
|
||||
URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
// Query the namenode for the key provider uri.
|
||||
FsServerDefaults serverDefaults = getServerDefaults();
|
||||
if (serverDefaults.getKeyProviderUri() != null) {
|
||||
if (!serverDefaults.getKeyProviderUri().isEmpty()) {
|
||||
keyProviderUri = URI.create(serverDefaults.getKeyProviderUri());
|
||||
}
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
// Last thing is to trust its own conf to be backwards compatible.
|
||||
String keyProviderUriStr = conf.getTrimmed(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
if (keyProviderUriStr != null && !keyProviderUriStr.isEmpty()) {
|
||||
keyProviderUri = URI.create(keyProviderUriStr);
|
||||
}
|
||||
return keyProviderUri;
|
||||
return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri,
|
||||
getServerDefaults().getKeyProviderUri(), conf);
|
||||
}
|
||||
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
|
@ -23,7 +23,6 @@
|
||||
import com.google.common.primitives.SignedBytes;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -54,7 +53,6 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -596,30 +594,6 @@ public static ReconfigurationProtocol createReconfigurationProtocolProxy(
|
||||
return new ReconfigurationProtocolTranslatorPB(addr, ticket, conf, factory);
|
||||
}
|
||||
|
||||
private static String keyProviderUriKeyName =
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
||||
|
||||
/**
|
||||
* Set the key provider uri configuration key name for creating key providers.
|
||||
* @param keyName The configuration key name.
|
||||
*/
|
||||
public static void setKeyProviderUriKeyName(final String keyName) {
|
||||
keyProviderUriKeyName = keyName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new KeyProvider from the given Configuration.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return new KeyProvider, or null if no provider was found.
|
||||
* @throws IOException if the KeyProvider is improperly specified in
|
||||
* the Configuration
|
||||
*/
|
||||
public static KeyProvider createKeyProvider(
|
||||
final Configuration conf) throws IOException {
|
||||
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
|
||||
}
|
||||
|
||||
public static Peer peerFromSocket(Socket socket)
|
||||
throws IOException {
|
||||
Peer peer;
|
||||
|
@ -26,7 +26,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStoragePolicySpi;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
@ -124,7 +125,8 @@
|
||||
*****************************************************************/
|
||||
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
|
||||
@InterfaceStability.Unstable
|
||||
public class DistributedFileSystem extends FileSystem {
|
||||
public class DistributedFileSystem extends FileSystem
|
||||
implements KeyProviderTokenIssuer {
|
||||
private Path workingDir;
|
||||
private URI uri;
|
||||
private String homeDirPrefix =
|
||||
@ -2604,29 +2606,22 @@ public Void next(final FileSystem fs, final Path p)
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
return dfs.getKeyProviderUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
return dfs.getKeyProvider();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
|
||||
URI keyProviderUri = dfs.getKeyProviderUri();
|
||||
if (keyProviderUri != null) {
|
||||
KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
|
||||
Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
|
||||
addDelegationTokens(renewer, credentials);
|
||||
credentials.addSecretKey(dfs.getKeyProviderMapKey(),
|
||||
DFSUtilClient.string2Bytes(keyProviderUri.toString()));
|
||||
if (tokens != null && kpTokens != null) {
|
||||
Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
|
||||
System.arraycopy(tokens, 0, all, 0, tokens.length);
|
||||
System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
|
||||
tokens = all;
|
||||
} else {
|
||||
tokens = (tokens != null) ? tokens : kpTokens;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
return HdfsKMSUtil.addDelegationTokensForKeyProvider(
|
||||
this, renewer, credentials, uri, tokens);
|
||||
}
|
||||
|
||||
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
|
||||
|
@ -0,0 +1,190 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
import org.apache.hadoop.crypto.CryptoCodec;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileEncryptionInfo;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
|
||||
/**
|
||||
* Utility class for key provider related methods in hdfs client package.
|
||||
*
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public final class HdfsKMSUtil {
|
||||
private static final String DFS_KMS_PREFIX = "dfs-kms-";
|
||||
private static String keyProviderUriKeyName =
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
||||
|
||||
private HdfsKMSUtil() { /* Hidden constructor */ }
|
||||
|
||||
/**
|
||||
* Creates a new KeyProvider from the given Configuration.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return new KeyProvider, or null if no provider was found.
|
||||
* @throws IOException if the KeyProvider is improperly specified in
|
||||
* the Configuration
|
||||
*/
|
||||
public static KeyProvider createKeyProvider(
|
||||
final Configuration conf) throws IOException {
|
||||
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
|
||||
}
|
||||
|
||||
public static Token<?>[] addDelegationTokensForKeyProvider(
|
||||
KeyProviderTokenIssuer kpTokenIssuer, final String renewer,
|
||||
Credentials credentials, URI namenodeUri, Token<?>[] tokens)
|
||||
throws IOException {
|
||||
KeyProvider keyProvider = kpTokenIssuer.getKeyProvider();
|
||||
if (keyProvider != null) {
|
||||
KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension
|
||||
= KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(keyProvider);
|
||||
Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
|
||||
addDelegationTokens(renewer, credentials);
|
||||
credentials.addSecretKey(getKeyProviderMapKey(namenodeUri),
|
||||
DFSUtilClient.string2Bytes(
|
||||
kpTokenIssuer.getKeyProviderUri().toString()));
|
||||
if (tokens != null && kpTokens != null) {
|
||||
Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
|
||||
System.arraycopy(tokens, 0, all, 0, tokens.length);
|
||||
System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
|
||||
tokens = all;
|
||||
} else {
|
||||
tokens = (tokens != null) ? tokens : kpTokens;
|
||||
}
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the crypto protocol version from the provided FileEncryptionInfo,
|
||||
* checking to see if this version is supported by.
|
||||
*
|
||||
* @param feInfo FileEncryptionInfo
|
||||
* @return CryptoProtocolVersion from the feInfo
|
||||
* @throws IOException if the protocol version is unsupported.
|
||||
*/
|
||||
public static CryptoProtocolVersion getCryptoProtocolVersion(
|
||||
FileEncryptionInfo feInfo) throws IOException {
|
||||
final CryptoProtocolVersion version = feInfo.getCryptoProtocolVersion();
|
||||
if (!CryptoProtocolVersion.supports(version)) {
|
||||
throw new IOException("Client does not support specified " +
|
||||
"CryptoProtocolVersion " + version.getDescription() + " version " +
|
||||
"number" + version.getVersion());
|
||||
}
|
||||
return version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain a CryptoCodec based on the CipherSuite set in a FileEncryptionInfo
|
||||
* and the available CryptoCodecs configured in the Configuration.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param feInfo FileEncryptionInfo
|
||||
* @return CryptoCodec
|
||||
* @throws IOException if no suitable CryptoCodec for the CipherSuite is
|
||||
* available.
|
||||
*/
|
||||
public static CryptoCodec getCryptoCodec(Configuration conf,
|
||||
FileEncryptionInfo feInfo) throws IOException {
|
||||
final CipherSuite suite = feInfo.getCipherSuite();
|
||||
if (suite.equals(CipherSuite.UNKNOWN)) {
|
||||
throw new IOException("NameNode specified unknown CipherSuite with ID "
|
||||
+ suite.getUnknownValue() + ", cannot instantiate CryptoCodec.");
|
||||
}
|
||||
final CryptoCodec codec = CryptoCodec.getInstance(conf, suite);
|
||||
if (codec == null) {
|
||||
throw new UnknownCipherSuiteException(
|
||||
"No configuration found for the cipher suite "
|
||||
+ suite.getConfigSuffix() + " prefixed with "
|
||||
+ HADOOP_SECURITY_CRYPTO_CODEC_CLASSES_KEY_PREFIX
|
||||
+ ". Please see the example configuration "
|
||||
+ "hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE "
|
||||
+ "at core-default.xml for details.");
|
||||
}
|
||||
return codec;
|
||||
}
|
||||
|
||||
/**
|
||||
* The key provider uri is searched in the following order.
|
||||
* 1. If there is a mapping in Credential's secrets map for namenode uri.
|
||||
* 2. From namenode getServerDefaults call.
|
||||
* 3. Finally fallback to local conf.
|
||||
* @return keyProviderUri if found from either of above 3 cases,
|
||||
* null otherwise
|
||||
* @throws IOException
|
||||
*/
|
||||
public static URI getKeyProviderUri(UserGroupInformation ugi,
|
||||
URI namenodeUri, String keyProviderUriStr, Configuration conf)
|
||||
throws IOException {
|
||||
URI keyProviderUri = null;
|
||||
// Lookup the secret in credentials object for namenodeuri.
|
||||
Credentials credentials = ugi.getCredentials();
|
||||
byte[] keyProviderUriBytes =
|
||||
credentials.getSecretKey(getKeyProviderMapKey(namenodeUri));
|
||||
if(keyProviderUriBytes != null) {
|
||||
keyProviderUri =
|
||||
URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
if (keyProviderUriStr != null) {
|
||||
if (!keyProviderUriStr.isEmpty()) {
|
||||
keyProviderUri = URI.create(keyProviderUriStr);
|
||||
}
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
// Last thing is to trust its own conf to be backwards compatible.
|
||||
String keyProviderUriFromConf = conf.getTrimmed(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
if (keyProviderUriFromConf != null && !keyProviderUriFromConf.isEmpty()) {
|
||||
keyProviderUri = URI.create(keyProviderUriFromConf);
|
||||
}
|
||||
return keyProviderUri;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a key to map namenode uri to key provider uri.
|
||||
* Tasks will lookup this key to find key Provider.
|
||||
*/
|
||||
public static Text getKeyProviderMapKey(URI namenodeUri) {
|
||||
return new Text(DFS_KMS_PREFIX + namenodeUri.getScheme()
|
||||
+"://" + namenodeUri.getAuthority());
|
||||
}
|
||||
}
|
@ -56,6 +56,8 @@
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
@ -84,6 +86,7 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsKMSUtil;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
@ -100,6 +103,7 @@
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
@ -107,6 +111,7 @@
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
@ -120,7 +125,7 @@
|
||||
/** A FileSystem for HDFS over the web. */
|
||||
public class WebHdfsFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable,
|
||||
TokenAspect.TokenManagementDelegator {
|
||||
TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer {
|
||||
public static final Logger LOG = LoggerFactory
|
||||
.getLogger(WebHdfsFileSystem.class);
|
||||
/** WebHdfs version. */
|
||||
@ -1633,6 +1638,13 @@ public synchronized void cancelDelegationToken(final Token<?> token
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
|
||||
return HdfsKMSUtil.addDelegationTokensForKeyProvider(this, renewer,
|
||||
credentials, getUri(), tokens);
|
||||
}
|
||||
|
||||
public BlockLocation[] getFileBlockLocations(final FileStatus status,
|
||||
final long offset, final long length) throws IOException {
|
||||
if (status == null) {
|
||||
@ -1822,6 +1834,29 @@ public void setRetryPolicy(RetryPolicy rp) {
|
||||
this.retryPolicy = rp;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public URI getKeyProviderUri() throws IOException {
|
||||
String keyProviderUri = null;
|
||||
try {
|
||||
keyProviderUri = getServerDefaults().getKeyProviderUri();
|
||||
} catch (UnsupportedOperationException e) {
|
||||
// This means server doesn't supports GETSERVERDEFAULTS call.
|
||||
// Do nothing, let keyProviderUri = null.
|
||||
}
|
||||
return HdfsKMSUtil.getKeyProviderUri(ugi, getUri(), keyProviderUri,
|
||||
getConf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public KeyProvider getKeyProvider() throws IOException {
|
||||
URI keyProviderUri = getKeyProviderUri();
|
||||
if (keyProviderUri == null) {
|
||||
return null;
|
||||
}
|
||||
return KMSUtil.createKeyProviderFromUri(getConf(), keyProviderUri);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is used for opening, reading, and seeking files while using the
|
||||
* WebHdfsFileSystem. This class will invoke the retry policy when performing
|
||||
|
@ -1655,7 +1655,7 @@ public static void assertAllResultsEqual(Collection<?> objects)
|
||||
*/
|
||||
public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
|
||||
final Configuration conf) throws IOException {
|
||||
KeyProvider keyProvider = DFSUtilClient.createKeyProvider(conf);
|
||||
KeyProvider keyProvider = HdfsKMSUtil.createKeyProvider(conf);
|
||||
if (keyProvider == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -1705,7 +1705,8 @@ public void testProviderUriInCredentials() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
// Key provider uri should be in the secret map of credentials object with
|
||||
// namenode uri as key
|
||||
Text lookUpKey = client.getKeyProviderMapKey();
|
||||
Text lookUpKey = HdfsKMSUtil.getKeyProviderMapKey(
|
||||
cluster.getFileSystem().getUri());
|
||||
credentials.addSecretKey(lookUpKey,
|
||||
DFSUtilClient.string2Bytes(dummyKeyProvider));
|
||||
client.ugi.addCredentials(credentials);
|
||||
@ -1856,7 +1857,8 @@ public void testEncryptedReadWriteUsingDiffKeyProvider() throws Exception {
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||
DFSClient client = cluster.getFileSystem().getClient();
|
||||
Credentials credentials = new Credentials();
|
||||
Text lookUpKey = client.getKeyProviderMapKey();
|
||||
Text lookUpKey = HdfsKMSUtil.
|
||||
getKeyProviderMapKey(cluster.getFileSystem().getUri());
|
||||
credentials.addSecretKey(lookUpKey,
|
||||
DFSUtilClient.string2Bytes(getKeyProviderURI()));
|
||||
client.ugi.addCredentials(credentials);
|
||||
@ -1920,4 +1922,38 @@ public void testListEncryptionZonesWithSnapshots() throws Exception {
|
||||
dfsAdmin.listEncryptionZones().hasNext());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test returns mocked kms token when
|
||||
* {@link WebHdfsFileSystem#addDelegationTokens(String, Credentials)} method
|
||||
* is called.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void addMockKmsToken() throws Exception {
|
||||
UserGroupInformation.createRemoteUser("JobTracker");
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
KeyProvider keyProvider = Mockito.mock(KeyProvider.class, withSettings()
|
||||
.extraInterfaces(DelegationTokenExtension.class,
|
||||
CryptoExtension.class));
|
||||
Mockito.when(keyProvider.getConf()).thenReturn(conf);
|
||||
byte[] testIdentifier = "Test identifier for delegation token".getBytes();
|
||||
|
||||
Token<?> testToken = new Token(testIdentifier, new byte[0],
|
||||
new Text("kms-dt"), new Text());
|
||||
Mockito.when(((DelegationTokenExtension) keyProvider)
|
||||
.addDelegationTokens(anyString(), (Credentials) any()))
|
||||
.thenReturn(new Token<?>[] {testToken});
|
||||
|
||||
WebHdfsFileSystem webfsSpy = Mockito.spy(webfs);
|
||||
Mockito.doReturn(keyProvider).when(webfsSpy).getKeyProvider();
|
||||
|
||||
Credentials creds = new Credentials();
|
||||
final Token<?>[] tokens =
|
||||
webfsSpy.addDelegationTokens("JobTracker", creds);
|
||||
|
||||
Assert.assertEquals(2, tokens.length);
|
||||
Assert.assertEquals(tokens[1], testToken);
|
||||
Assert.assertEquals(1, creds.numberOfTokens());
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
|
||||
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
||||
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
@ -28,6 +29,9 @@
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -134,4 +138,23 @@ public Boolean get() {
|
||||
}
|
||||
}, 1000, 60000);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method fetches the kms delegation token
|
||||
* for {@link WebHdfsFileSystem}.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void addDelegationTokenFromWebhdfsFileSystem() throws Exception {
|
||||
UserGroupInformation.createRemoteUser("JobTracker");
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
Credentials creds = new Credentials();
|
||||
final Token<?>[] tokens = webfs.addDelegationTokens("JobTracker", creds);
|
||||
|
||||
Assert.assertEquals(2, tokens.length);
|
||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND_STR,
|
||||
tokens[1].getKind().toString());
|
||||
Assert.assertEquals(2, creds.numberOfTokens());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user