HDFS-7718. Store KeyProvider in ClientContext to avoid leaking key provider threads when using FileContext (Arun Suresh via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-02-09 20:23:23 -08:00
parent a9dc5cd706
commit 02340a24f2
11 changed files with 304 additions and 34 deletions

View File

@ -893,6 +893,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7744. Fix potential NPE in DFSInputStream after setDropBehind or
setReadahead is called (cmccabe)
HDFS-7718. Store KeyProvider in ClientContext to avoid leaking key provider
threads when using FileContext (Arun Suresh via Colin P. McCabe)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
/**
* ClientContext contains context information for a client.
@ -71,6 +72,10 @@ public class ClientContext {
*/
private final DomainSocketFactory domainSocketFactory;
/**
* Caches key Providers for the DFSClient
*/
private final KeyProviderCache keyProviderCache;
/**
* True if we should use the legacy BlockReaderLocal.
*/
@ -107,6 +112,7 @@ private ClientContext(String name, Conf conf) {
conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
this.peerCache =
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
this.keyProviderCache = new KeyProviderCache(conf.keyProviderCacheExpiryMs);
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
this.domainSocketFactory = new DomainSocketFactory(conf);
@ -138,7 +144,9 @@ public static String confAsString(Conf conf) {
append(", domainSocketDataTraffic = ").
append(conf.domainSocketDataTraffic).
append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = ").
append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs).
append(", keyProviderCacheExpiryMs = ").
append(conf.keyProviderCacheExpiryMs);
return builder.toString();
}
@ -195,6 +203,10 @@ public PeerCache getPeerCache() {
return peerCache;
}
public KeyProviderCache getKeyProviderCache() {
return keyProviderCache;
}
public boolean getUseLegacyBlockReaderLocal() {
return useLegacyBlockReaderLocal;
}

View File

@ -275,8 +275,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
new DFSHedgedReadMetrics();
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
@VisibleForTesting
KeyProvider provider;
private final Sampler<?> traceSampler;
/**
@ -336,6 +334,8 @@ public static class Conf {
final long shortCircuitMmapCacheRetryTimeout;
final long shortCircuitCacheStaleThresholdMs;
final long keyProviderCacheExpiryMs;
public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf);
@ -499,6 +499,10 @@ public Conf(Configuration conf) {
dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
keyProviderCacheExpiryMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS,
DFSConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT);
}
public boolean isUseLegacyBlockReaderLocal() {
@ -638,14 +642,6 @@ public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
provider = DFSUtil.createKeyProvider(conf);
if (LOG.isDebugEnabled()) {
if (provider == null) {
LOG.debug("No KeyProvider found.");
} else {
LOG.debug("Found KeyProvider: " + provider.toString());
}
}
int numResponseToDrop = conf.getInt(
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
@ -961,7 +957,6 @@ private void closeAllFilesBeingWritten(final boolean abort) {
*/
@Override
public synchronized void close() throws IOException {
try {
if(clientRunning) {
closeAllFilesBeingWritten(false);
clientRunning = false;
@ -969,11 +964,6 @@ public synchronized void close() throws IOException {
// close connections to the namenode
closeConnectionToNamenode();
}
} finally {
if (provider != null) {
provider.close();
}
}
}
/**
@ -1382,6 +1372,7 @@ private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo) throws IOException {
TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler);
try {
KeyProvider provider = getKeyProvider();
if (provider == null) {
throw new IOException("No KeyProvider is configured, cannot access" +
" an encrypted file");
@ -3499,12 +3490,16 @@ DFSHedgedReadMetrics getHedgedReadMetrics() {
}
public KeyProvider getKeyProvider() {
return provider;
return clientContext.getKeyProviderCache().get(conf);
}
@VisibleForTesting
public void setKeyProvider(KeyProviderCryptoExtension provider) {
this.provider = provider;
public void setKeyProvider(KeyProvider provider) {
try {
clientContext.getKeyProviderCache().setKeyProvider(conf, provider);
} catch (IOException e) {
LOG.error("Could not set KeyProvider !!", e);
}
}
/**

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
@ -769,4 +771,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1","5","25"};
public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
// Key Provider Cache Expiry
public static final String DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS =
"dfs.client.key.provider.cache.expiry";
// 10 days
public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
TimeUnit.DAYS.toMillis(10);
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
public class KeyProviderCache {
public static final Log LOG = LogFactory.getLog(KeyProviderCache.class);
private final Cache<URI, KeyProvider> cache;
public KeyProviderCache(long expiryMs) {
cache = CacheBuilder.newBuilder()
.expireAfterAccess(expiryMs, TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<URI, KeyProvider>() {
@Override
public void onRemoval(
RemovalNotification<URI, KeyProvider> notification) {
try {
notification.getValue().close();
} catch (Throwable e) {
LOG.error(
"Error closing KeyProvider with uri ["
+ notification.getKey() + "]", e);
;
}
}
})
.build();
}
public KeyProvider get(final Configuration conf) {
URI kpURI = createKeyProviderURI(conf);
if (kpURI == null) {
return null;
}
try {
return cache.get(kpURI, new Callable<KeyProvider>() {
@Override
public KeyProvider call() throws Exception {
return DFSUtil.createKeyProvider(conf);
}
});
} catch (Exception e) {
LOG.error("Could not create KeyProvider for DFSClient !!", e.getCause());
return null;
}
}
private URI createKeyProviderURI(Configuration conf) {
final String providerUriStr =
conf.get(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, null);
// No provider set in conf
if (providerUriStr == null) {
LOG.error("Could not find uri with key ["
+ DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI
+ "] to create a keyProvider !!");
return null;
}
final URI providerUri;
try {
providerUri = new URI(providerUriStr);
} catch (URISyntaxException e) {
LOG.error("KeyProvider URI string is invalid [" + providerUriStr
+ "]!!", e.getCause());
return null;
}
return providerUri;
}
@VisibleForTesting
public void setKeyProvider(Configuration conf, KeyProvider keyProvider)
throws IOException {
URI uri = createKeyProviderURI(conf);
cache.put(uri, keyProvider);
}
}

View File

@ -38,11 +38,13 @@
import java.util.concurrent.Future;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
@ -96,7 +98,6 @@
import static org.mockito.Mockito.withSettings;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.apache.hadoop.hdfs.DFSTestUtil.verifyFilesEqual;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.assertEquals;
@ -105,6 +106,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.xml.sax.InputSource;
import org.xml.sax.helpers.DefaultHandler;
@ -157,8 +159,8 @@ public void setup() throws Exception {
protected void setProvider() {
// Need to set the client's KeyProvider to the NN's for JKS,
// else the updates do not get flushed properly
fs.getClient().provider = cluster.getNameNode().getNamesystem()
.getProvider();
fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem()
.getProvider());
}
@After
@ -1072,7 +1074,7 @@ public void testDelegationToken() throws Exception {
addDelegationTokens(anyString(), (Credentials)any())).
thenReturn(new Token<?>[] { testToken });
dfs.getClient().provider = keyProvider;
dfs.getClient().setKeyProvider(keyProvider);
Credentials creds = new Credentials();
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);

View File

@ -80,7 +80,7 @@ public void setupCluster() throws Exception {
dfsAdmin1 = new HdfsAdmin(cluster.getURI(1), conf);
KeyProviderCryptoExtension nn0Provider =
cluster.getNameNode(0).getNamesystem().getProvider();
fs.getClient().provider = nn0Provider;
fs.getClient().setKeyProvider(nn0Provider);
}
@After

View File

@ -71,7 +71,7 @@ public void testCreateEZPopulatesEDEKCache() throws Exception {
final Path zonePath = new Path("/TestEncryptionZone");
fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false);
dfsAdmin.createEncryptionZone(zonePath, TEST_KEY);
assertTrue(((KMSClientProvider)fs.getClient().provider).
assertTrue(((KMSClientProvider)fs.getClient().getKeyProvider()).
getEncKeyQueueSize(TEST_KEY) > 0);
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.junit.Assert;
import org.junit.Test;
public class TestKeyProviderCache {
public static class DummyKeyProvider extends KeyProvider {
public DummyKeyProvider(Configuration conf) {
super(conf);
}
@Override
public KeyVersion getKeyVersion(String versionName) throws IOException {
return null;
}
@Override
public List<String> getKeys() throws IOException {
return null;
}
@Override
public List<KeyVersion> getKeyVersions(String name) throws IOException {
return null;
}
@Override
public Metadata getMetadata(String name) throws IOException {
return null;
}
@Override
public KeyVersion createKey(String name, byte[] material, Options options)
throws IOException {
return null;
}
@Override
public void deleteKey(String name) throws IOException {
}
@Override
public KeyVersion rollNewVersion(String name, byte[] material)
throws IOException {
return null;
}
@Override
public void flush() throws IOException {
}
}
public static class Factory extends KeyProviderFactory {
@Override
public KeyProvider createProvider(URI providerName, Configuration conf)
throws IOException {
if ("dummy".equals(providerName.getScheme())) {
return new DummyKeyProvider(conf);
}
return null;
}
}
@Test
public void testCache() throws Exception {
KeyProviderCache kpCache = new KeyProviderCache(10000);
Configuration conf = new Configuration();
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
"dummy://foo:bar@test_provider1");
KeyProvider keyProvider1 = kpCache.get(conf);
Assert.assertNotNull("Returned Key Provider is null !!", keyProvider1);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
"dummy://foo:bar@test_provider1");
KeyProvider keyProvider2 = kpCache.get(conf);
Assert.assertTrue("Different KeyProviders returned !!",
keyProvider1 == keyProvider2);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
"dummy://test_provider3");
KeyProvider keyProvider3 = kpCache.get(conf);
Assert.assertFalse("Same KeyProviders returned !!",
keyProvider1 == keyProvider3);
conf.set(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI,
"dummy://hello:there@test_provider1");
KeyProvider keyProvider4 = kpCache.get(conf);
Assert.assertFalse("Same KeyProviders returned !!",
keyProvider1 == keyProvider4);
}
}

View File

@ -81,8 +81,8 @@ public void setup() throws Exception {
dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
// Need to set the client's KeyProvider to the NN's for JKS,
// else the updates do not get flushed properly
fs.getClient().provider = cluster.getNameNode().getNamesystem()
.getProvider();
fs.getClient().setKeyProvider(cluster.getNameNode().getNamesystem()
.getProvider());
DFSTestUtil.createKey(TEST_KEY, cluster, conf);
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.hdfs.TestKeyProviderCache$Factory