HDFS-6724. Decrypt EDEK before creating CryptoInputStream/CryptoOutputStream. (wang)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1613490 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e670641bbf
commit
b9e4be4523
@ -59,6 +59,9 @@ fs-encryption (Unreleased)
|
|||||||
HDFS-6738. Remove unnecessary getEncryptionZoneForPath call in
|
HDFS-6738. Remove unnecessary getEncryptionZoneForPath call in
|
||||||
EZManager#createEncryptionZone. (clamb)
|
EZManager#createEncryptionZone. (clamb)
|
||||||
|
|
||||||
|
HDFS-6724. Decrypt EDEK before creating
|
||||||
|
CryptoInputStream/CryptoOutputStream. (wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -17,6 +17,9 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||||
|
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension
|
||||||
|
.EncryptedKeyVersion;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
@ -76,6 +79,7 @@
|
|||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
@ -100,6 +104,7 @@
|
|||||||
import org.apache.hadoop.crypto.CryptoCodec;
|
import org.apache.hadoop.crypto.CryptoCodec;
|
||||||
import org.apache.hadoop.crypto.CryptoInputStream;
|
import org.apache.hadoop.crypto.CryptoInputStream;
|
||||||
import org.apache.hadoop.crypto.CryptoOutputStream;
|
import org.apache.hadoop.crypto.CryptoOutputStream;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||||
import org.apache.hadoop.fs.CacheFlag;
|
import org.apache.hadoop.fs.CacheFlag;
|
||||||
@ -256,7 +261,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||||||
private final CryptoCodec codec;
|
private final CryptoCodec codec;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
List<CipherSuite> cipherSuites;
|
List<CipherSuite> cipherSuites;
|
||||||
|
@VisibleForTesting
|
||||||
|
KeyProviderCryptoExtension provider;
|
||||||
/**
|
/**
|
||||||
* DFSClient configuration
|
* DFSClient configuration
|
||||||
*/
|
*/
|
||||||
@ -591,7 +597,12 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
|
|||||||
this.codec = CryptoCodec.getInstance(conf);
|
this.codec = CryptoCodec.getInstance(conf);
|
||||||
this.cipherSuites = Lists.newArrayListWithCapacity(1);
|
this.cipherSuites = Lists.newArrayListWithCapacity(1);
|
||||||
cipherSuites.add(codec.getCipherSuite());
|
cipherSuites.add(codec.getCipherSuite());
|
||||||
|
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
|
||||||
|
if (provider == null) {
|
||||||
|
LOG.info("No KeyProvider found.");
|
||||||
|
} else {
|
||||||
|
LOG.info("Found KeyProvider: " + provider.toString());
|
||||||
|
}
|
||||||
int numResponseToDrop = conf.getInt(
|
int numResponseToDrop = conf.getInt(
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
|
||||||
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
|
||||||
@ -1291,6 +1302,25 @@ public BlockStorageLocation[] getBlockStorageLocations(
|
|||||||
return volumeBlockLocations;
|
return volumeBlockLocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrypts a EDEK by consulting the KeyProvider.
|
||||||
|
*/
|
||||||
|
private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
|
||||||
|
feInfo) throws IOException {
|
||||||
|
if (provider == null) {
|
||||||
|
throw new IOException("No KeyProvider is configured, cannot access" +
|
||||||
|
" an encrypted file");
|
||||||
|
}
|
||||||
|
EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
|
||||||
|
feInfo.getEzKeyVersionName(), feInfo.getIV(),
|
||||||
|
feInfo.getEncryptedDataEncryptionKey());
|
||||||
|
try {
|
||||||
|
return provider.decryptEncryptedKey(ekv);
|
||||||
|
} catch (GeneralSecurityException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wraps the stream in a CryptoInputStream if the underlying file is
|
* Wraps the stream in a CryptoInputStream if the underlying file is
|
||||||
* encrypted.
|
* encrypted.
|
||||||
@ -1300,13 +1330,14 @@ public HdfsDataInputStream createWrappedInputStream(DFSInputStream dfsis)
|
|||||||
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
|
final FileEncryptionInfo feInfo = dfsis.getFileEncryptionInfo();
|
||||||
if (feInfo != null) {
|
if (feInfo != null) {
|
||||||
// File is encrypted, wrap the stream in a crypto stream.
|
// File is encrypted, wrap the stream in a crypto stream.
|
||||||
|
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||||
final CryptoInputStream cryptoIn =
|
final CryptoInputStream cryptoIn =
|
||||||
new CryptoInputStream(dfsis, CryptoCodec.getInstance(conf,
|
new CryptoInputStream(dfsis, CryptoCodec.getInstance(conf,
|
||||||
feInfo.getCipherSuite()), feInfo.getEncryptedDataEncryptionKey(),
|
feInfo.getCipherSuite()), decrypted.getMaterial(),
|
||||||
feInfo.getIV());
|
feInfo.getIV());
|
||||||
return new HdfsDataInputStream(cryptoIn);
|
return new HdfsDataInputStream(cryptoIn);
|
||||||
} else {
|
} else {
|
||||||
// No key/IV pair so no encryption.
|
// No FileEncryptionInfo so no encryption.
|
||||||
return new HdfsDataInputStream(dfsis);
|
return new HdfsDataInputStream(dfsis);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1329,12 +1360,13 @@ public HdfsDataOutputStream createWrappedOutputStream(DFSOutputStream dfsos,
|
|||||||
final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
|
final FileEncryptionInfo feInfo = dfsos.getFileEncryptionInfo();
|
||||||
if (feInfo != null) {
|
if (feInfo != null) {
|
||||||
// File is encrypted, wrap the stream in a crypto stream.
|
// File is encrypted, wrap the stream in a crypto stream.
|
||||||
|
KeyVersion decrypted = decryptEncryptedDataEncryptionKey(feInfo);
|
||||||
final CryptoOutputStream cryptoOut =
|
final CryptoOutputStream cryptoOut =
|
||||||
new CryptoOutputStream(dfsos, codec,
|
new CryptoOutputStream(dfsos, codec,
|
||||||
feInfo.getEncryptedDataEncryptionKey(), feInfo.getIV(), startPos);
|
decrypted.getMaterial(), feInfo.getIV(), startPos);
|
||||||
return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
|
return new HdfsDataOutputStream(cryptoOut, statistics, startPos);
|
||||||
} else {
|
} else {
|
||||||
// No key/IV present so no encryption.
|
// No FileEncryptionInfo present so no encryption.
|
||||||
return new HdfsDataOutputStream(dfsos, statistics, startPos);
|
return new HdfsDataOutputStream(dfsos, statistics, startPos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,6 +68,9 @@
|
|||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -1695,4 +1698,39 @@ public static void assertAllResultsEqual(Collection<?> objects)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new KeyProviderCryptoExtension by wrapping the
|
||||||
|
* KeyProvider specified in the given Configuration.
|
||||||
|
*
|
||||||
|
* @param conf Configuration specifying a single, non-transient KeyProvider.
|
||||||
|
* @return new KeyProviderCryptoExtension, or null if no provider was found.
|
||||||
|
* @throws IOException if the KeyProvider is improperly specified in
|
||||||
|
* the Configuration
|
||||||
|
*/
|
||||||
|
public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
|
||||||
|
final Configuration conf) throws IOException {
|
||||||
|
final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
|
||||||
|
if (providers == null || providers.size() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (providers.size() > 1) {
|
||||||
|
StringBuilder builder = new StringBuilder();
|
||||||
|
builder.append("Found multiple KeyProviders but only one is permitted [");
|
||||||
|
String prefix = " ";
|
||||||
|
for (KeyProvider kp: providers) {
|
||||||
|
builder.append(prefix + kp.toString());
|
||||||
|
prefix = ", ";
|
||||||
|
}
|
||||||
|
builder.append("]");
|
||||||
|
throw new IOException(builder.toString());
|
||||||
|
}
|
||||||
|
KeyProviderCryptoExtension provider = KeyProviderCryptoExtension
|
||||||
|
.createKeyProviderCryptoExtension(providers.get(0));
|
||||||
|
if (provider.isTransient()) {
|
||||||
|
throw new IOException("KeyProvider " + provider.toString()
|
||||||
|
+ " was found but it is a transient provider.");
|
||||||
|
}
|
||||||
|
return provider;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -141,7 +141,6 @@
|
|||||||
import org.apache.hadoop.crypto.CryptoCodec;
|
import org.apache.hadoop.crypto.CryptoCodec;
|
||||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
|
||||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||||
import org.apache.hadoop.fs.CacheFlag;
|
import org.apache.hadoop.fs.CacheFlag;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
@ -766,7 +765,12 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
|||||||
*/
|
*/
|
||||||
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
|
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
initializeKeyProvider(conf);
|
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
|
||||||
|
if (provider == null) {
|
||||||
|
LOG.info("No KeyProvider found.");
|
||||||
|
} else {
|
||||||
|
LOG.info("Found KeyProvider: " + provider.toString());
|
||||||
|
}
|
||||||
providerOptions = KeyProvider.options(conf);
|
providerOptions = KeyProvider.options(conf);
|
||||||
this.codec = CryptoCodec.getInstance(conf);
|
this.codec = CryptoCodec.getInstance(conf);
|
||||||
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
|
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
|
||||||
@ -926,40 +930,8 @@ void addCacheEntry(byte[] clientId, int callId) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeKeyProvider(final Configuration conf) {
|
|
||||||
try {
|
|
||||||
final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
|
|
||||||
if (providers == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (providers.size() == 0) {
|
|
||||||
LOG.info("No KeyProviders found.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (providers.size() > 1) {
|
|
||||||
final String err =
|
|
||||||
"Multiple KeyProviders found. Only one is permitted.";
|
|
||||||
LOG.error(err);
|
|
||||||
throw new RuntimeException(err);
|
|
||||||
}
|
|
||||||
provider = KeyProviderCryptoExtension
|
|
||||||
.createKeyProviderCryptoExtension(providers.get(0));
|
|
||||||
if (provider.isTransient()) {
|
|
||||||
final String err =
|
|
||||||
"A KeyProvider was found but it is a transient provider.";
|
|
||||||
LOG.error(err);
|
|
||||||
throw new RuntimeException(err);
|
|
||||||
}
|
|
||||||
LOG.info("Found KeyProvider: " + provider.toString());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Exception while initializing KeyProvider", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public KeyProvider getProvider() {
|
public KeyProviderCryptoExtension getProvider() {
|
||||||
return provider;
|
return provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +90,10 @@ public void setup() throws IOException {
|
|||||||
fcWrapper = new FileContextTestWrapper(
|
fcWrapper = new FileContextTestWrapper(
|
||||||
FileContext.getFileContext(cluster.getURI(), conf));
|
FileContext.getFileContext(cluster.getURI(), conf));
|
||||||
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
|
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
|
||||||
|
// Need to set the client's KeyProvider to the NN's for JKS,
|
||||||
|
// else the updates do not get flushed properly
|
||||||
|
fs.getClient().provider = cluster.getNameNode().getNamesystem()
|
||||||
|
.getProvider();
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
Loading…
Reference in New Issue
Block a user