HDFS-13603: Do not propagate ExecutionException while initializing EDEK queues for keys. (#6860)

This commit is contained in:
Yu Zhang 2024-06-03 09:10:06 -07:00 committed by GitHub
parent 167d4c8447
commit f1e2ceb823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 62 additions and 16 deletions

View File

@ -947,11 +947,7 @@ public void flush() throws IOException {
@Override @Override
public void warmUpEncryptedKeys(String... keyNames) public void warmUpEncryptedKeys(String... keyNames)
throws IOException { throws IOException {
try { encKeyVersionQueue.initializeQueuesForKeys(keyNames);
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override

View File

@ -269,12 +269,24 @@ public ValueQueue(final int numValues, final float lowWaterMark, long expiry,
* Initializes the Value Queues for the provided keys by calling the * Initializes the Value Queues for the provided keys by calling the
* fill Method with "numInitValues" values * fill Method with "numInitValues" values
* @param keyNames Array of key Names * @param keyNames Array of key Names
* @throws ExecutionException executionException. * @throws IOException if initialization fails for any provided keys
*/ */
public void initializeQueuesForKeys(String... keyNames) public void initializeQueuesForKeys(String... keyNames) throws IOException {
throws ExecutionException { int successfulInitializations = 0;
ExecutionException lastException = null;
for (String keyName : keyNames) { for (String keyName : keyNames) {
keyQueues.get(keyName); try {
keyQueues.get(keyName);
successfulInitializations++;
} catch (ExecutionException e) {
lastException = e;
}
}
if (keyNames.length > 0 && successfulInitializations != keyNames.length) {
throw new IOException(String.format("Failed to initialize %s queues for the provided keys.",
keyNames.length - successfulInitializations), lastException);
} }
} }

View File

@ -21,19 +21,27 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller; import org.apache.hadoop.crypto.key.kms.ValueQueue.QueueRefiller;
import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy; import org.apache.hadoop.crypto.key.kms.ValueQueue.SyncGenerationPolicy;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
public class TestValueQueue { public class TestValueQueue {
Logger LOG = LoggerFactory.getLogger(TestValueQueue.class); Logger LOG = LoggerFactory.getLogger(TestValueQueue.class);
@ -111,6 +119,41 @@ public void testWarmUp() throws Exception {
vq.shutdown(); vq.shutdown();
} }
/**
* Verifies that Queue is initialized (Warmed-up) for partial keys.
*/
@Test(timeout = 30000)
public void testPartialWarmUp() throws Exception {
MockFiller filler = new MockFiller();
ValueQueue<String> vq =
new ValueQueue<>(10, 0.5f, 30000, 1,
SyncGenerationPolicy.ALL, filler);
@SuppressWarnings("unchecked")
LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>> kq =
(LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>>)
FieldUtils.getField(ValueQueue.class, "keyQueues", true).get(vq);
LoadingCache<String, LinkedBlockingQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>>
kqSpy = spy(kq);
doThrow(new ExecutionException(new Exception())).when(kqSpy).get("k2");
FieldUtils.writeField(vq, "keyQueues", kqSpy, true);
Assert.assertThrows(IOException.class, () -> vq.initializeQueuesForKeys("k1", "k2", "k3"));
verify(kqSpy, times(1)).get("k2");
FillInfo[] fillInfos =
{filler.getTop(), filler.getTop(), filler.getTop()};
Assert.assertEquals(5, fillInfos[0].num);
Assert.assertEquals(5, fillInfos[1].num);
Assert.assertNull(fillInfos[2]);
Assert.assertEquals(new HashSet<>(Arrays.asList("k1", "k3")),
new HashSet<>(Arrays.asList(fillInfos[0].key,
fillInfos[1].key)));
vq.shutdown();
}
/** /**
* Verifies that the refill task is executed after "checkInterval" if * Verifies that the refill task is executed after "checkInterval" if
* num values below "lowWatermark" * num values below "lowWatermark"

View File

@ -104,13 +104,8 @@ SyncGenerationPolicy.LOW_WATERMARK, new EncryptedQueueRefiller()
} }
@Override @Override
public void warmUpEncryptedKeys(String... keyNames) throws public void warmUpEncryptedKeys(String... keyNames) throws IOException {
IOException { encKeyVersionQueue.initializeQueuesForKeys(keyNames);
try {
encKeyVersionQueue.initializeQueuesForKeys(keyNames);
} catch (ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override