From f1e2ceb823e92ce864f7f2f327c4c0af722b4d85 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Mon, 3 Jun 2024 09:10:06 -0700 Subject: [PATCH] HDFS-13603: Do not propagate ExecutionException while initializing EDEK queues for keys. (#6860) --- .../crypto/key/kms/KMSClientProvider.java | 6 +-- .../hadoop/crypto/key/kms/ValueQueue.java | 20 +++++++-- .../hadoop/crypto/key/TestValueQueue.java | 43 +++++++++++++++++++ ...eyGeneratorKeyProviderCryptoExtension.java | 9 +--- 4 files changed, 62 insertions(+), 16 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index f0c912224f..6ee9068ea3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -947,11 +947,7 @@ public void flush() throws IOException { @Override public void warmUpEncryptedKeys(String... keyNames) throws IOException { - try { - encKeyVersionQueue.initializeQueuesForKeys(keyNames); - } catch (ExecutionException e) { - throw new IOException(e); - } + encKeyVersionQueue.initializeQueuesForKeys(keyNames); } @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java index 58ce443146..cbf4193563 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/ValueQueue.java @@ -269,12 +269,24 @@ public ValueQueue(final int numValues, final float lowWaterMark, long expiry, * Initializes the Value Queues for the provided keys by calling the * fill Method with "numInitValues" values * @param keyNames Array of key Names - * @throws ExecutionException executionException. + * @throws IOException if initialization fails for any provided keys */ - public void initializeQueuesForKeys(String... keyNames) - throws ExecutionException { + public void initializeQueuesForKeys(String... keyNames) throws IOException { + int successfulInitializations = 0; + ExecutionException lastException = null; + for (String keyName : keyNames) { - keyQueues.get(keyName); + try { + keyQueues.get(keyName); + successfulInitializations++; + } catch (ExecutionException e) { + lastException = e; + } + } + + if (keyNames.length > 0 && successfulInitializations != keyNames.length) { + throw new IOException(String.format("Failed to initialize %s queues for the provided keys.", + keyNames.length - successfulInitializations), lastException); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java index 4805fca1d4..6bf76b6e50 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestValueQueue.java @@ -21,19 +21,27 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller; import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache; import org.junit.Assert; import org.junit.Test; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; + public class TestValueQueue { Logger LOG = LoggerFactory.getLogger(TestValueQueue.class); @@ -111,6 +119,41 @@ public void testWarmUp() throws Exception { vq.shutdown(); } + /** + * Verifies that Queue is initialized (Warmed-up) for partial keys. + */ + @Test(timeout = 30000) + public void testPartialWarmUp() throws Exception { + MockFiller filler = new MockFiller(); + ValueQueue vq = + new ValueQueue<>(10, 0.5f, 30000, 1, + SyncGenerationPolicy.ALL, filler); + + @SuppressWarnings("unchecked") + LoadingCache> kq = + (LoadingCache>) + FieldUtils.getField(ValueQueue.class, "keyQueues", true).get(vq); + + LoadingCache> + kqSpy = spy(kq); + doThrow(new ExecutionException(new Exception())).when(kqSpy).get("k2"); + FieldUtils.writeField(vq, "keyQueues", kqSpy, true); + + Assert.assertThrows(IOException.class, () -> vq.initializeQueuesForKeys("k1", "k2", "k3")); + verify(kqSpy, times(1)).get("k2"); + + FillInfo[] fillInfos = + {filler.getTop(), filler.getTop(), filler.getTop()}; + Assert.assertEquals(5, fillInfos[0].num); + Assert.assertEquals(5, fillInfos[1].num); + Assert.assertNull(fillInfos[2]); + + Assert.assertEquals(new HashSet<>(Arrays.asList("k1", "k3")), + new HashSet<>(Arrays.asList(fillInfos[0].key, + fillInfos[1].key))); + vq.shutdown(); + } + /** * Verifies that the refill task is executed after "checkInterval" if * num values below "lowWatermark" diff --git a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java index 273c673361..bc9e6d7a90 100644 --- a/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java +++ b/hadoop-common-project/hadoop-kms/src/main/java/org/apache/hadoop/crypto/key/kms/server/EagerKeyGeneratorKeyProviderCryptoExtension.java @@ -104,13 +104,8 @@ SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller() } @Override - public void warmUpEncryptedKeys(String... keyNames) throws - IOException { - try { - encKeyVersionQueue.initializeQueuesForKeys(keyNames); - } catch (ExecutionException e) { - throw new IOException(e); - } + public void warmUpEncryptedKeys(String... keyNames) throws IOException { + encKeyVersionQueue.initializeQueuesForKeys(keyNames); } @Override