Hadoop 16857. ABFS: Stop CustomTokenProvider retry logic to depend on AbfsRestOp retry policy
Contributed by Sneha Vijayarajan
This commit is contained in:
parent
264e49c8f2
commit
3d69383c26
@ -110,6 +110,11 @@ public class AbfsConfiguration{
|
|||||||
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
|
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
|
||||||
private int maxIoRetries;
|
private int maxIoRetries;
|
||||||
|
|
||||||
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT,
|
||||||
|
MinValue = 0,
|
||||||
|
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
|
||||||
|
private int customTokenFetchRetryCount;
|
||||||
|
|
||||||
@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
||||||
MinValue = 0,
|
MinValue = 0,
|
||||||
MaxValue = MAX_AZURE_BLOCK_SIZE,
|
MaxValue = MAX_AZURE_BLOCK_SIZE,
|
||||||
@ -425,6 +430,10 @@ public int getMaxIoRetries() {
|
|||||||
return this.maxIoRetries;
|
return this.maxIoRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getCustomTokenFetchRetryCount() {
|
||||||
|
return this.customTokenFetchRetryCount;
|
||||||
|
}
|
||||||
|
|
||||||
public long getAzureBlockSize() {
|
public long getAzureBlockSize() {
|
||||||
return this.azureBlockSize;
|
return this.azureBlockSize;
|
||||||
}
|
}
|
||||||
@ -597,7 +606,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
|||||||
LOG.trace("Initializing {}", customTokenProviderClass.getName());
|
LOG.trace("Initializing {}", customTokenProviderClass.getName());
|
||||||
azureTokenProvider.initialize(rawConfig, accountName);
|
azureTokenProvider.initialize(rawConfig, accountName);
|
||||||
LOG.trace("{} init complete", customTokenProviderClass.getName());
|
LOG.trace("{} init complete", customTokenProviderClass.getName());
|
||||||
return new CustomTokenProviderAdapter(azureTokenProvider);
|
return new CustomTokenProviderAdapter(azureTokenProvider, getCustomTokenFetchRetryCount());
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -732,6 +741,11 @@ void setListMaxResults(int listMaxResults) {
|
|||||||
this.listMaxResults = listMaxResults;
|
this.listMaxResults = listMaxResults;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setMaxIoRetries(int maxIoRetries) {
|
||||||
|
this.maxIoRetries = maxIoRetries;
|
||||||
|
}
|
||||||
|
|
||||||
private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
|
private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
|
||||||
String value = getPasswordString(key);
|
String value = getPasswordString(key);
|
||||||
if (StringUtils.isBlank(value)) {
|
if (StringUtils.isBlank(value)) {
|
||||||
|
@ -36,6 +36,7 @@ public final class ConfigurationKeys {
|
|||||||
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
|
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
|
||||||
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
||||||
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
||||||
|
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
|
||||||
|
|
||||||
// Read and write buffer sizes defined by the user
|
// Read and write buffer sizes defined by the user
|
||||||
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
||||||
|
@ -37,6 +37,7 @@ public final class FileSystemConfigurations {
|
|||||||
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
|
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
|
||||||
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
|
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
|
||||||
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
|
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
|
||||||
|
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
|
||||||
|
|
||||||
private static final int ONE_KB = 1024;
|
private static final int ONE_KB = 1024;
|
||||||
private static final int ONE_MB = ONE_KB * ONE_KB;
|
private static final int ONE_MB = ONE_KB * ONE_KB;
|
||||||
|
@ -230,12 +230,23 @@ public String getMessage() {
|
|||||||
final StringBuilder sb = new StringBuilder();
|
final StringBuilder sb = new StringBuilder();
|
||||||
sb.append("HTTP Error ");
|
sb.append("HTTP Error ");
|
||||||
sb.append(httpErrorCode);
|
sb.append(httpErrorCode);
|
||||||
sb.append("; url='").append(url).append('\'');
|
if (!url.isEmpty()) {
|
||||||
sb.append(' ');
|
sb.append("; url='").append(url).append('\'').append(' ');
|
||||||
|
}
|
||||||
|
|
||||||
sb.append(super.getMessage());
|
sb.append(super.getMessage());
|
||||||
sb.append("; requestId='").append(requestId).append('\'');
|
if (!requestId.isEmpty()) {
|
||||||
sb.append("; contentType='").append(contentType).append('\'');
|
sb.append("; requestId='").append(requestId).append('\'');
|
||||||
sb.append("; response '").append(body).append('\'');
|
}
|
||||||
|
|
||||||
|
if (!contentType.isEmpty()) {
|
||||||
|
sb.append("; contentType='").append(contentType).append('\'');
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!body.isEmpty()) {
|
||||||
|
sb.append("; response '").append(body).append('\'');
|
||||||
|
}
|
||||||
|
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
|
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
|
||||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||||
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.HttpException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provides tokens based on custom implementation, following the Adapter Design
|
* Provides tokens based on custom implementation, following the Adapter Design
|
||||||
@ -38,6 +39,7 @@
|
|||||||
public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
||||||
implements BoundDTExtension {
|
implements BoundDTExtension {
|
||||||
|
|
||||||
|
private final int fetchTokenRetryCount;
|
||||||
private CustomTokenProviderAdaptee adaptee;
|
private CustomTokenProviderAdaptee adaptee;
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
|
||||||
|
|
||||||
@ -45,17 +47,57 @@ public final class CustomTokenProviderAdapter extends AccessTokenProvider
|
|||||||
* Constructs a token provider based on the custom token provider.
|
* Constructs a token provider based on the custom token provider.
|
||||||
*
|
*
|
||||||
* @param adaptee the custom token provider
|
* @param adaptee the custom token provider
|
||||||
|
* @param customTokenFetchRetryCount max retry count for customTokenFetch
|
||||||
*/
|
*/
|
||||||
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) {
|
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee, int customTokenFetchRetryCount) {
|
||||||
Preconditions.checkNotNull(adaptee, "adaptee");
|
Preconditions.checkNotNull(adaptee, "adaptee");
|
||||||
this.adaptee = adaptee;
|
this.adaptee = adaptee;
|
||||||
|
fetchTokenRetryCount = customTokenFetchRetryCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AzureADToken refreshToken() throws IOException {
|
protected AzureADToken refreshToken() throws IOException {
|
||||||
LOG.debug("AADToken: refreshing custom based token");
|
LOG.debug("AADToken: refreshing custom based token");
|
||||||
|
|
||||||
AzureADToken azureADToken = new AzureADToken();
|
AzureADToken azureADToken = new AzureADToken();
|
||||||
azureADToken.setAccessToken(adaptee.getAccessToken());
|
|
||||||
|
String accessToken = null;
|
||||||
|
|
||||||
|
Exception ex;
|
||||||
|
boolean succeeded = false;
|
||||||
|
// Custom token providers should have their own retry policies,
|
||||||
|
// Providing a linear retry option for the the retry count
|
||||||
|
// mentioned in config "fs.azure.custom.token.fetch.retry.count"
|
||||||
|
int retryCount = fetchTokenRetryCount;
|
||||||
|
do {
|
||||||
|
ex = null;
|
||||||
|
try {
|
||||||
|
accessToken = adaptee.getAccessToken();
|
||||||
|
LOG.trace("CustomTokenProvider Access token fetch was successful with retry count {}",
|
||||||
|
(fetchTokenRetryCount - retryCount));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("CustomTokenProvider Access token fetch failed with retry count {}",
|
||||||
|
(fetchTokenRetryCount - retryCount));
|
||||||
|
ex = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
succeeded = (ex == null);
|
||||||
|
retryCount--;
|
||||||
|
} while (!succeeded && (retryCount) >= 0);
|
||||||
|
|
||||||
|
if (!succeeded) {
|
||||||
|
HttpException httpEx = new HttpException(
|
||||||
|
-1,
|
||||||
|
"",
|
||||||
|
String.format("CustomTokenProvider getAccessToken threw %s : %s",
|
||||||
|
ex.getClass().getTypeName(), ex.getMessage()),
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
""
|
||||||
|
);
|
||||||
|
throw httpEx;
|
||||||
|
}
|
||||||
|
|
||||||
|
azureADToken.setAccessToken(accessToken);
|
||||||
azureADToken.setExpiry(adaptee.getExpiryTime());
|
azureADToken.setExpiry(adaptee.getExpiryTime());
|
||||||
|
|
||||||
return azureADToken;
|
return azureADToken;
|
||||||
|
@ -539,6 +539,8 @@ token when its `getAccessToken()` method is invoked.
|
|||||||
The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee`
|
The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee`
|
||||||
and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
|
and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.
|
||||||
|
|
||||||
|
The declared class also holds responsibility to implement retry logic while fetching access tokens.
|
||||||
|
|
||||||
## <a name="technical"></a> Technical notes
|
## <a name="technical"></a> Technical notes
|
||||||
|
|
||||||
### <a name="proxy"></a> Proxy setup
|
### <a name="proxy"></a> Proxy setup
|
||||||
|
@ -79,12 +79,13 @@ public void testAbfsRestOperationExceptionFormat() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRequestRetryConfig() throws Exception {
|
public void testCustomTokenFetchRetryCount() throws Exception {
|
||||||
testRetryLogic(0);
|
testWithDifferentCustomTokenFetchRetry(0);
|
||||||
testRetryLogic(3);
|
testWithDifferentCustomTokenFetchRetry(3);
|
||||||
|
testWithDifferentCustomTokenFetchRetry(5);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRetryLogic(int numOfRetries) throws Exception {
|
public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exception {
|
||||||
AzureBlobFileSystem fs = this.getFileSystem();
|
AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
|
|
||||||
Configuration config = new Configuration(this.getRawConfiguration());
|
Configuration config = new Configuration(this.getRawConfiguration());
|
||||||
@ -93,7 +94,7 @@ public void testRetryLogic(int numOfRetries) throws Exception {
|
|||||||
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
||||||
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
||||||
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
||||||
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
|
config.set("fs.azure.custom.token.fetch.retry.count", Integer.toString(numOfRetries));
|
||||||
// Stop filesystem creation as it will lead to calls to store.
|
// Stop filesystem creation as it will lead to calls to store.
|
||||||
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
||||||
|
|
||||||
@ -110,7 +111,7 @@ public void testRetryLogic(int numOfRetries) throws Exception {
|
|||||||
// Number of retries done should be as configured
|
// Number of retries done should be as configured
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||||
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
|
+ ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries
|
||||||
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -0,0 +1,88 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.services;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test TestExponentialRetryPolicy.
|
||||||
|
*/
|
||||||
|
public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
|
private final int maxRetryCount = 30;
|
||||||
|
private final int noRetryCount = 0;
|
||||||
|
private final int retryCount = new Random().nextInt(maxRetryCount);
|
||||||
|
private final int retryCountBeyondMax = maxRetryCount + 1;
|
||||||
|
|
||||||
|
|
||||||
|
public TestExponentialRetryPolicy() throws Exception {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDifferentMaxIORetryCount() throws Exception {
|
||||||
|
AbfsConfiguration abfsConfig = getAbfsConfig();
|
||||||
|
abfsConfig.setMaxIoRetries(noRetryCount);
|
||||||
|
testMaxIOConfig(abfsConfig);
|
||||||
|
abfsConfig.setMaxIoRetries(retryCount);
|
||||||
|
testMaxIOConfig(abfsConfig);
|
||||||
|
abfsConfig.setMaxIoRetries(retryCountBeyondMax);
|
||||||
|
testMaxIOConfig(abfsConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultMaxIORetryCount() throws Exception {
|
||||||
|
AbfsConfiguration abfsConfig = getAbfsConfig();
|
||||||
|
Assert.assertTrue(
|
||||||
|
String.format("default maxIORetry count is %s.", maxRetryCount),
|
||||||
|
abfsConfig.getMaxIoRetries() == maxRetryCount);
|
||||||
|
testMaxIOConfig(abfsConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbfsConfiguration getAbfsConfig() throws Exception {
|
||||||
|
Configuration
|
||||||
|
config = new Configuration(this.getRawConfiguration());
|
||||||
|
return new AbfsConfiguration(config, "dummyAccountName");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testMaxIOConfig(AbfsConfiguration abfsConfig) {
|
||||||
|
ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(
|
||||||
|
abfsConfig.getMaxIoRetries());
|
||||||
|
int localRetryCount = 0;
|
||||||
|
|
||||||
|
while (localRetryCount < abfsConfig.getMaxIoRetries()) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Retry should be allowed when retryCount less than max count configured.",
|
||||||
|
retryPolicy.shouldRetry(localRetryCount, -1));
|
||||||
|
localRetryCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue(
|
||||||
|
"When all retries are exhausted, the retryCount will be same as max configured",
|
||||||
|
localRetryCount == abfsConfig.getMaxIoRetries());
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user