diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index b894c7f14c..32ef09c966 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -849,12 +849,8 @@ public void drain(String keyName) { } @VisibleForTesting - public int getEncKeyQueueSize(String keyName) throws IOException { - try { - return encKeyVersionQueue.getSize(keyName); - } catch (ExecutionException e) { - throw new IOException(e); - } + public int getEncKeyQueueSize(String keyName) { + return encKeyVersionQueue.getSize(keyName); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java index 32451d8360..f38a6b3c98 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java @@ -18,9 +18,11 @@ package org.apache.hadoop.crypto.key.kms; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; @@ -240,13 +242,19 @@ public void drain(String keyName ) { } /** - * Get size of the Queue for keyName + * Get size of the Queue for keyName. This is only used in unit tests. * @param keyName the key name * @return int queue size - * @throws ExecutionException */ - public int getSize(String keyName) throws ExecutionException { - return keyQueues.get(keyName).size(); + public int getSize(String keyName) { + // We can't do keyQueues.get(keyName).size() here, + // since that will have the side effect of populating the cache. + Map> map = + keyQueues.getAllPresent(Arrays.asList(keyName)); + if (map.get(keyName) == null) { + return 0; + } + return map.get(keyName).size(); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java index 5eae9a006d..abc4ebf9b4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.Queue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -158,17 +157,12 @@ public void testgetAtMostPolicyALL() throws Exception { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - try { int size = vq.getSize("k1"); if (size != 10) { LOG.info("Current ValueQueue size is " + size); return false; } return true; - } catch (ExecutionException e) { - LOG.error("Exception when getSize.", e); - return false; - } } }, 100, 3000); Assert.assertEquals("Failed in async call.", 10, filler.getTop().num); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5eaada44e5..844fec2984 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -750,6 +750,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI; + public static final String DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY = "dfs.namenode.edekcacheloader.interval.ms"; + public static final int DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000; + public static final String DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY = "dfs.namenode.edekcacheloader.initial.delay.ms"; + public static final int DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000; // Journal-node related configs. These are read on the JN side. public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java index d1621a8bb7..8454c0411c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EncryptionZoneManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.NavigableMap; import java.util.TreeMap; @@ -380,4 +381,18 @@ BatchedListEntries listEncryptionZones(long prevId) public int getNumEncryptionZones() { return encryptionZones.size(); } + + /** + * @return a list of all key names. + */ + String[] getKeyNames() { + assert dir.hasReadLock(); + String[] ret = new String[encryptionZones.size()]; + int index = 0; + for (Map.Entry entry : encryptionZones + .entrySet()) { + ret[index] = entry.getValue().getKeyName(); + } + return ret; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java index b663415a82..bd254190eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.security.GeneralSecurityException; import java.util.AbstractMap; +import java.util.concurrent.ExecutorService; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -304,4 +305,86 @@ static boolean isInAnEZ(final FSDirectory fsd, final INodesInPath iip) fsd.readUnlock(); } } + + /** + * Proactively warm up the edek cache. We'll get all the edek key names, + * then launch up a separate thread to warm them up. + */ + static void warmUpEdekCache(final ExecutorService executor, + final FSDirectory fsd, final int delay, final int interval) { + fsd.readLock(); + try { + String[] edeks = fsd.ezManager.getKeyNames(); + executor.execute( + new EDEKCacheLoader(edeks, fsd.getProvider(), delay, interval)); + } finally { + fsd.readUnlock(); + } + } + + /** + * EDEKCacheLoader is being run in a separate thread to loop through all the + * EDEKs and warm them up in the KMS cache. + */ + static class EDEKCacheLoader implements Runnable { + private final String[] keyNames; + private final KeyProviderCryptoExtension kp; + private int initialDelay; + private int retryInterval; + + EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp, + final int delay, final int interval) { + this.keyNames = names; + this.kp = kp; + this.initialDelay = delay; + this.retryInterval = interval; + } + + @Override + public void run() { + NameNode.LOG.info("Warming up {} EDEKs... (initialDelay={}, " + + "retryInterval={})", keyNames.length, initialDelay, retryInterval); + try { + Thread.sleep(initialDelay); + } catch (InterruptedException ie) { + NameNode.LOG.info("EDEKCacheLoader interrupted before warming up."); + return; + } + + final int logCoolDown = 10000; // periodically print error log (if any) + int sinceLastLog = logCoolDown; // always print the first failure + boolean success = false; + IOException lastSeenIOE = null; + while (true) { + try { + kp.warmUpEncryptedKeys(keyNames); + NameNode.LOG + .info("Successfully warmed up {} EDEKs.", keyNames.length); + success = true; + break; + } catch (IOException ioe) { + lastSeenIOE = ioe; + if (sinceLastLog >= logCoolDown) { + NameNode.LOG.info("Failed to warm up EDEKs.", ioe); + sinceLastLog = 0; + } else { + NameNode.LOG.debug("Failed to warm up EDEKs.", ioe); + } + } + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ie) { + NameNode.LOG.info("EDEKCacheLoader interrupted during retry."); + break; + } + sinceLastLog += retryInterval; + } + if (!success) { + NameNode.LOG.warn("Unable to warm up EDEKs."); + if (lastSeenIOE != null) { + NameNode.LOG.warn("Last seen exception:", lastSeenIOE); + } + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index a5b9dc29fa..9ff4be637d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -116,6 +116,8 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -283,6 +285,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * FSNamesystem is a container of both transient @@ -425,6 +428,12 @@ private void logAuditEvent(boolean succeeded, // A daemon to periodically clean up corrupt lazyPersist files // from the name space. Daemon lazyPersistFileScrubber = null; + + // Executor to warm up EDEK cache + private ExecutorService edekCacheLoader = null; + private final int edekCacheLoaderDelay; + private final int edekCacheLoaderInterval; + /** * When an active namenode will roll its own edit log, in # edits */ @@ -787,6 +796,13 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { + " must be zero (for disable) or greater than zero."); } + this.edekCacheLoaderDelay = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT); + this.edekCacheLoaderInterval = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_KEY, + DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INTERVAL_MS_DEFAULT); + // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( @@ -1128,6 +1144,14 @@ void startActiveServices() throws IOException { cacheManager.startMonitorThread(); blockManager.getDatanodeManager().setShouldSendCachingCommands(true); + if (provider != null) { + edekCacheLoader = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Warm Up EDEK Cache Thread #%d") + .build()); + FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, + edekCacheLoaderDelay, edekCacheLoaderInterval); + } } finally { startingActiveService = false; writeUnlock(); @@ -1162,6 +1186,9 @@ void stopActiveServices() { ((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor(); nnrmthread.interrupt(); } + if (edekCacheLoader != null) { + edekCacheLoader.shutdownNow(); + } if (nnEditLogRoller != null) { ((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop(); nnEditLogRoller.interrupt(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d1f78cadeb..1e87626afe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2606,6 +2606,24 @@ + + dfs.namenode.edekcacheloader.interval.ms + 1000 + When KeyProvider is configured, the interval time of warming + up edek cache on NN starts up / becomes active. All edeks will be loaded + from KMS into provider cache. The edek cache loader will try to warm up the + cache until succeed or NN leaves active state. + + + + + dfs.namenode.edekcacheloader.initial.delay.ms + 3000 + When KeyProvider is configured, the time delayed until the first + attempt to warm up edek cache on NN start up / become active. + + + dfs.namenode.inotify.max.events.per.rpc 1000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java index 6c2ce6a291..c8d98ee130 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZones.java @@ -117,10 +117,10 @@ public class TestEncryptionZones { - private Configuration conf; + protected Configuration conf; private FileSystemTestHelper fsHelper; - private MiniDFSCluster cluster; + protected MiniDFSCluster cluster; protected HdfsAdmin dfsAdmin; protected DistributedFileSystem fs; private File testRootDir; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java index 0040d7561b..59c8dd52f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestEncryptionZonesWithKMS.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertTrue; +import com.google.common.base.Supplier; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.apache.hadoop.crypto.key.kms.server.MiniKMS; import org.apache.hadoop.security.Credentials; @@ -26,10 +27,12 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import java.io.File; import java.util.Arrays; @@ -71,8 +74,10 @@ public void testCreateEZPopulatesEDEKCache() throws Exception { final Path zonePath = new Path("/TestEncryptionZone"); fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); dfsAdmin.createEncryptionZone(zonePath, TEST_KEY); - assertTrue(((KMSClientProvider)fs.getClient().getKeyProvider()). - getEncKeyQueueSize(TEST_KEY) > 0); + @SuppressWarnings("unchecked") + KMSClientProvider kcp = (KMSClientProvider) Whitebox + .getInternalState(cluster.getNamesystem().getProvider(), "extension"); + assertTrue(kcp.getEncKeyQueueSize(TEST_KEY) > 0); } @Test(timeout = 120000) @@ -92,4 +97,31 @@ public void testDelegationToken() throws Exception { Assert.assertEquals(0, tokens.length); Assert.assertEquals(2, creds.numberOfTokens()); } + + @Test(timeout = 120000) + public void testWarmupEDEKCacheOnStartup() throws Exception { + final Path zonePath = new Path("/TestEncryptionZone"); + fsWrapper.mkdir(zonePath, FsPermission.getDirDefault(), false); + dfsAdmin.createEncryptionZone(zonePath, TEST_KEY); + + @SuppressWarnings("unchecked") + KMSClientProvider spy = (KMSClientProvider) Whitebox + .getInternalState(cluster.getNamesystem().getProvider(), "extension"); + assertTrue("key queue is empty after creating encryption zone", + spy.getEncKeyQueueSize(TEST_KEY) > 0); + + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, 0); + cluster.restartNameNode(true); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final KMSClientProvider kspy = (KMSClientProvider) Whitebox + .getInternalState(cluster.getNamesystem().getProvider(), + "extension"); + return kspy.getEncKeyQueueSize(TEST_KEY) > 0; + } + }, 1000, 60000); + } }