From edeb2a356ad671d962764c6e2aee9f9e7d6f394c Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Thu, 16 Aug 2018 22:32:32 -0700 Subject: [PATCH] HADOOP-15655. Enhance KMS client retry behavior. Contributed by Kitti Nanasi. --- .../kms/LoadBalancingKMSClientProvider.java | 43 +++-- .../TestLoadBalancingKMSClientProvider.java | 181 +++++++++++++++++- 2 files changed, 193 insertions(+), 31 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java index 23cdc50d66..e68e8448aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java @@ -113,8 +113,8 @@ public KMSClientProvider[] getProviders() { return providers; } - private T doOp(ProviderCallable op, int currPos) - throws IOException { + private T doOp(ProviderCallable op, int currPos, + boolean isIdempotent) throws IOException { if (providers.length == 0) { throw new IOException("No providers configured !"); } @@ -143,7 +143,7 @@ private T doOp(ProviderCallable op, int currPos) } RetryAction action = null; try { - action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false); + action = retryPolicy.shouldRetry(ioe, 0, numFailovers, isIdempotent); } catch (Exception e) { if (e instanceof IOException) { throw (IOException)e; @@ -201,7 +201,7 @@ private int nextIdx() { public Token[] call(KMSClientProvider provider) throws IOException { return provider.addDelegationTokens(renewer, credentials); } - }, nextIdx()); + }, nextIdx(), false); } @Override @@ -211,7 +211,7 @@ public long renewDelegationToken(final Token token) throws IOException { public Long call(KMSClientProvider provider) throws IOException { return provider.renewDelegationToken(token); } - }, nextIdx()); + }, nextIdx(), false); } @Override @@ -222,7 +222,7 @@ public Void call(KMSClientProvider provider) throws IOException { provider.cancelDelegationToken(token); return null; } - }, nextIdx()); + }, nextIdx(), false); } // This request is sent to all providers in the load-balancing group @@ -275,7 +275,7 @@ public EncryptedKeyVersion call(KMSClientProvider provider) throws IOException, GeneralSecurityException { return provider.generateEncryptedKey(encryptionKeyName); } - }, nextIdx()); + }, nextIdx(), true); } catch (WrapperException we) { if (we.getCause() instanceof GeneralSecurityException) { throw (GeneralSecurityException) we.getCause(); @@ -295,7 +295,7 @@ public KeyVersion call(KMSClientProvider provider) throws IOException, GeneralSecurityException { return provider.decryptEncryptedKey(encryptedKeyVersion); } - }, nextIdx()); + }, nextIdx(), true); } catch (WrapperException we) { if (we.getCause() instanceof GeneralSecurityException) { throw (GeneralSecurityException) we.getCause(); @@ -315,7 +315,7 @@ public EncryptedKeyVersion call(KMSClientProvider provider) throws IOException, GeneralSecurityException { return provider.reencryptEncryptedKey(ekv); } - }, nextIdx()); + }, nextIdx(), true); } catch (WrapperException we) { if (we.getCause() instanceof GeneralSecurityException) { throw (GeneralSecurityException) we.getCause(); @@ -335,7 +335,7 @@ public Void call(KMSClientProvider provider) provider.reencryptEncryptedKeys(ekvs); return null; } - }, nextIdx()); + }, nextIdx(), true); } catch (WrapperException we) { if (we.getCause() instanceof GeneralSecurityException) { throw (GeneralSecurityException) we.getCause(); @@ -351,7 +351,7 @@ public KeyVersion getKeyVersion(final String versionName) throws IOException { public KeyVersion call(KMSClientProvider provider) throws IOException { return provider.getKeyVersion(versionName); } - }, nextIdx()); + }, nextIdx(), true); } @Override @@ -361,7 +361,7 @@ public List getKeys() throws IOException { public List call(KMSClientProvider provider) throws IOException { return provider.getKeys(); } - }, nextIdx()); + }, nextIdx(), true); } @Override @@ -371,7 +371,7 @@ public Metadata[] getKeysMetadata(final String... names) throws IOException { public Metadata[] call(KMSClientProvider provider) throws IOException { return provider.getKeysMetadata(names); } - }, nextIdx()); + }, nextIdx(), true); } @Override @@ -382,7 +382,7 @@ public List call(KMSClientProvider provider) throws IOException { return provider.getKeyVersions(name); } - }, nextIdx()); + }, nextIdx(), true); } @Override @@ -392,8 +392,9 @@ public KeyVersion getCurrentKey(final String name) throws IOException { public KeyVersion call(KMSClientProvider provider) throws IOException { return provider.getCurrentKey(name); } - }, nextIdx()); + }, nextIdx(), true); } + @Override public Metadata getMetadata(final String name) throws IOException { return doOp(new ProviderCallable() { @@ -401,7 +402,7 @@ public Metadata getMetadata(final String name) throws IOException { public Metadata call(KMSClientProvider provider) throws IOException { return provider.getMetadata(name); } - }, nextIdx()); + }, nextIdx(), true); } @Override @@ -412,7 +413,7 @@ public KeyVersion createKey(final String name, final byte[] material, public KeyVersion call(KMSClientProvider provider) throws IOException { return provider.createKey(name, material, options); } - }, nextIdx()); + }, nextIdx(), false); } @Override @@ -425,7 +426,7 @@ public KeyVersion call(KMSClientProvider provider) throws IOException, NoSuchAlgorithmException { return provider.createKey(name, options); } - }, nextIdx()); + }, nextIdx(), false); } catch (WrapperException e) { if (e.getCause() instanceof GeneralSecurityException) { throw (NoSuchAlgorithmException) e.getCause(); @@ -442,7 +443,7 @@ public Void call(KMSClientProvider provider) throws IOException { provider.deleteKey(name); return null; } - }, nextIdx()); + }, nextIdx(), false); } @Override @@ -453,7 +454,7 @@ public KeyVersion rollNewVersion(final String name, final byte[] material) public KeyVersion call(KMSClientProvider provider) throws IOException { return provider.rollNewVersion(name, material); } - }, nextIdx()); + }, nextIdx(), false); invalidateCache(name); return newVersion; } @@ -468,7 +469,7 @@ public KeyVersion call(KMSClientProvider provider) throws IOException, NoSuchAlgorithmException { return provider.rollNewVersion(name); } - }, nextIdx()); + }, nextIdx(), false); invalidateCache(name); return newVersion; } catch (WrapperException e) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java index 4e7aed9cac..058db92179 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java @@ -29,10 +29,13 @@ import java.io.IOException; import java.net.ConnectException; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.UnknownHostException; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.List; import javax.net.ssl.SSLHandshakeException; @@ -355,24 +358,27 @@ public void testWarmUpEncryptedKeysWhenOneProviderSucceeds() } /** - * Tests whether retryPolicy fails immediately, after trying each provider - * once, on encountering IOException which is not SocketException. + * Tests whether retryPolicy fails immediately on non-idempotent operations, + * after trying each provider once, + * on encountering IOException which is not SocketException. * @throws Exception */ @Test - public void testClientRetriesWithIOException() throws Exception { + public void testClientRetriesNonIdempotentOpWithIOExceptionFailsImmediately() + throws Exception { Configuration conf = new Configuration(); + final String keyName = "test"; // Setting total failover attempts to . conf.setInt( CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10); KMSClientProvider p1 = mock(KMSClientProvider.class); - when(p1.getMetadata(Mockito.anyString())) + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) .thenThrow(new IOException("p1")); KMSClientProvider p2 = mock(KMSClientProvider.class); - when(p2.getMetadata(Mockito.anyString())) + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) .thenThrow(new IOException("p2")); KMSClientProvider p3 = mock(KMSClientProvider.class); - when(p3.getMetadata(Mockito.anyString())) + when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class))) .thenThrow(new IOException("p3")); when(p1.getKMSUrl()).thenReturn("p1"); @@ -381,17 +387,61 @@ public void testClientRetriesWithIOException() throws Exception { LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( new KMSClientProvider[] {p1, p2, p3}, 0, conf); try { - kp.getMetadata("test3"); + kp.createKey(keyName, new Options(conf)); fail("Should fail since all providers threw an IOException"); } catch (Exception e) { assertTrue(e instanceof IOException); } verify(kp.getProviders()[0], Mockito.times(1)) - .getMetadata(Mockito.eq("test3")); + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); verify(kp.getProviders()[1], Mockito.times(1)) - .getMetadata(Mockito.eq("test3")); + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); verify(kp.getProviders()[2], Mockito.times(1)) - .getMetadata(Mockito.eq("test3")); + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); + } + + /** + * Tests whether retryPolicy retries on idempotent operations + * when encountering IOException. + * @throws Exception + */ + @Test + public void testClientRetriesIdempotentOpWithIOExceptionSucceedsSecondTime() + throws Exception { + Configuration conf = new Configuration(); + final String keyName = "test"; + final KeyProvider.KeyVersion keyVersion + = new KMSClientProvider.KMSKeyVersion(keyName, "v1", + new byte[0]); + // Setting total failover attempts to . + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.getCurrentKey(Mockito.anyString())) + .thenThrow(new IOException("p1")) + .thenReturn(keyVersion); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.getCurrentKey(Mockito.anyString())) + .thenThrow(new IOException("p2")); + KMSClientProvider p3 = mock(KMSClientProvider.class); + when(p3.getCurrentKey(Mockito.anyString())) + .thenThrow(new IOException("p3")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + when(p3.getKMSUrl()).thenReturn("p3"); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2, p3}, 0, conf); + + KeyProvider.KeyVersion result = kp.getCurrentKey(keyName); + + assertEquals(keyVersion, result); + verify(kp.getProviders()[0], Mockito.times(2)) + .getCurrentKey(Mockito.eq(keyName)); + verify(kp.getProviders()[1], Mockito.times(1)) + .getCurrentKey(Mockito.eq(keyName)); + verify(kp.getProviders()[2], Mockito.times(1)) + .getCurrentKey(Mockito.eq(keyName)); } /** @@ -717,4 +767,115 @@ public void testClientRetriesWithSSLHandshakeExceptionFailsAtEveryAttempt() verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName), Mockito.any(Options.class)); } + + /** + * Tests that if an idempotent operation succeeds second time after + * SocketTimeoutException, then the operation is successful. + * @throws Exception + */ + @Test + public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionSucceeds() + throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3); + final List keys = Arrays.asList("testKey"); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.getKeys()) + .thenThrow(new SocketTimeoutException("p1")) + .thenReturn(keys); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.getKeys()).thenThrow(new SocketTimeoutException("p2")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + + List result = kp.getKeys(); + assertEquals(keys, result); + verify(p1, Mockito.times(2)).getKeys(); + verify(p2, Mockito.times(1)).getKeys(); + } + + /** + * Tests that if a non idempotent operation fails at every attempt + * after SocketTimeoutException, then SocketTimeoutException is thrown. + * @throws Exception + */ + @Test + public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionFails() + throws Exception { + Configuration conf = new Configuration(); + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 2); + final String keyName = "test"; + final String exceptionMessage = "p1 exception message"; + KMSClientProvider p1 = mock(KMSClientProvider.class); + Exception originalEx = new SocketTimeoutException(exceptionMessage); + when(p1.getKeyVersions(Mockito.anyString())) + .thenThrow(originalEx); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.getKeyVersions(Mockito.anyString())) + .thenThrow(new SocketTimeoutException("p2 exception message")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2}, 0, conf); + + Exception interceptedEx = intercept(SocketTimeoutException.class, + "SocketTimeoutException: " + exceptionMessage, + ()-> kp.getKeyVersions(keyName)); + assertEquals(originalEx, interceptedEx); + + verify(p1, Mockito.times(2)) + .getKeyVersions(Mockito.eq(keyName)); + verify(p2, Mockito.times(1)) + .getKeyVersions(Mockito.eq(keyName)); + } + + /** + * Tests whether retryPolicy fails immediately on non-idempotent operations, + * after trying each provider once, on encountering SocketTimeoutException. + * @throws Exception + */ + @Test + public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails() + throws Exception { + Configuration conf = new Configuration(); + final String keyName = "test"; + // Setting total failover attempts to . + conf.setInt( + CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10); + KMSClientProvider p1 = mock(KMSClientProvider.class); + when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new SocketTimeoutException("p1")); + KMSClientProvider p2 = mock(KMSClientProvider.class); + when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new SocketTimeoutException("p2")); + KMSClientProvider p3 = mock(KMSClientProvider.class); + when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class))) + .thenThrow(new SocketTimeoutException("p3")); + + when(p1.getKMSUrl()).thenReturn("p1"); + when(p2.getKMSUrl()).thenReturn("p2"); + when(p3.getKMSUrl()).thenReturn("p3"); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new KMSClientProvider[] {p1, p2, p3}, 0, conf); + try { + kp.createKey(keyName, new Options(conf)); + fail("Should fail since all providers threw a SocketTimeoutException"); + } catch (Exception e) { + assertTrue(e instanceof SocketTimeoutException); + } + verify(kp.getProviders()[0], Mockito.times(1)) + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); + verify(kp.getProviders()[1], Mockito.times(1)) + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); + verify(kp.getProviders()[2], Mockito.times(1)) + .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); + } } \ No newline at end of file