Hadoop-18759: [ABFS][Backoff-Optimization] Have a Static retry policy for connection timeout. (#5881)
Contributed By: Anuj Modi
This commit is contained in:
parent
03d9acaa86
commit
1336c362e5
@ -152,6 +152,14 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
|
||||
private int maxBackoffInterval;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED,
|
||||
DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED)
|
||||
private boolean staticRetryForConnectionTimeoutEnabled;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_INTERVAL,
|
||||
DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL)
|
||||
private int staticRetryInterval;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
|
||||
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
|
||||
private int backoffInterval;
|
||||
@ -166,6 +174,14 @@ public class AbfsConfiguration{
|
||||
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
|
||||
private int customTokenFetchRetryCount;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT,
|
||||
DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
|
||||
private int httpConnectionTimeout;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT,
|
||||
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
|
||||
private int httpReadTimeout;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
|
||||
MinValue = 0,
|
||||
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
|
||||
@ -669,6 +685,14 @@ public int getMaxBackoffIntervalMilliseconds() {
|
||||
return this.maxBackoffInterval;
|
||||
}
|
||||
|
||||
public boolean getStaticRetryForConnectionTimeoutEnabled() {
|
||||
return staticRetryForConnectionTimeoutEnabled;
|
||||
}
|
||||
|
||||
public int getStaticRetryInterval() {
|
||||
return staticRetryInterval;
|
||||
}
|
||||
|
||||
public int getBackoffIntervalMilliseconds() {
|
||||
return this.backoffInterval;
|
||||
}
|
||||
@ -681,6 +705,14 @@ public int getCustomTokenFetchRetryCount() {
|
||||
return this.customTokenFetchRetryCount;
|
||||
}
|
||||
|
||||
public int getHttpConnectionTimeout() {
|
||||
return this.httpConnectionTimeout;
|
||||
}
|
||||
|
||||
public int getHttpReadTimeout() {
|
||||
return this.httpReadTimeout;
|
||||
}
|
||||
|
||||
public long getAzureBlockSize() {
|
||||
return this.azureBlockSize;
|
||||
}
|
||||
|
@ -118,6 +118,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
|
||||
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
|
||||
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
|
||||
@ -1781,6 +1782,8 @@ private AbfsClientContext populateAbfsClientContext() {
|
||||
return new AbfsClientContextBuilder()
|
||||
.withExponentialRetryPolicy(
|
||||
new ExponentialRetryPolicy(abfsConfiguration))
|
||||
.withStaticRetryPolicy(
|
||||
new StaticRetryPolicy(abfsConfiguration))
|
||||
.withAbfsCounters(abfsCounters)
|
||||
.withAbfsPerfTracker(abfsPerfTracker)
|
||||
.build();
|
||||
|
@ -48,10 +48,23 @@ public final class ConfigurationKeys {
|
||||
// Retry strategy defined by the user
|
||||
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
|
||||
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
|
||||
public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = "fs.azure.static.retry.for.connection.timeout.enabled";
|
||||
public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval";
|
||||
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
||||
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
||||
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
|
||||
|
||||
/**
|
||||
* Config to set HTTP Connection Timeout Value for Rest Operations.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout";
|
||||
/**
|
||||
* Config to set HTTP Read Timeout Value for Rest Operations.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";
|
||||
|
||||
// Retry strategy for getToken calls
|
||||
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
|
||||
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";
|
||||
|
@ -35,15 +35,28 @@ public final class FileSystemConfigurations {
|
||||
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
|
||||
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
|
||||
|
||||
private static final int SIXTY_SECONDS = 60 * 1000;
|
||||
private static final int SIXTY_SECONDS = 60_000;
|
||||
|
||||
// Retry parameter defaults.
|
||||
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s
|
||||
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
|
||||
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
|
||||
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s
|
||||
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000; // 30s
|
||||
public static final boolean DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true;
|
||||
public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
|
||||
public static final int DEFAULT_BACKOFF_INTERVAL = 3_000; // 3s
|
||||
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
|
||||
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
|
||||
|
||||
/**
|
||||
* Default value of connection timeout to be used while setting up HTTP Connection.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s
|
||||
/**
|
||||
* Default value of read timeout to be used while setting up HTTP Connection.
|
||||
* Value: {@value}.
|
||||
*/
|
||||
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
|
||||
|
||||
// Retry parameter defaults.
|
||||
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
|
||||
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
|
||||
|
@ -82,6 +82,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
|
||||
/**
|
||||
* AbfsClient.
|
||||
@ -93,7 +94,8 @@ public class AbfsClient implements Closeable {
|
||||
private final URL baseUrl;
|
||||
private final SharedKeyCredentials sharedKeyCredentials;
|
||||
private String xMsVersion = DECEMBER_2019_API_VERSION;
|
||||
private final ExponentialRetryPolicy retryPolicy;
|
||||
private final ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private final StaticRetryPolicy staticRetryPolicy;
|
||||
private final String filesystem;
|
||||
private final AbfsConfiguration abfsConfiguration;
|
||||
private final String userAgent;
|
||||
@ -131,7 +133,8 @@ private AbfsClient(final URL baseUrl,
|
||||
String baseUrlString = baseUrl.toString();
|
||||
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
|
||||
this.abfsConfiguration = abfsConfiguration;
|
||||
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
|
||||
this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy();
|
||||
this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
|
||||
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
|
||||
this.authType = abfsConfiguration.getAuthType(accountName);
|
||||
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
|
||||
@ -213,8 +216,24 @@ protected AbfsPerfTracker getAbfsPerfTracker() {
|
||||
return abfsPerfTracker;
|
||||
}
|
||||
|
||||
ExponentialRetryPolicy getRetryPolicy() {
|
||||
return retryPolicy;
|
||||
ExponentialRetryPolicy getExponentialRetryPolicy() {
|
||||
return exponentialRetryPolicy;
|
||||
}
|
||||
|
||||
StaticRetryPolicy getStaticRetryPolicy() {
|
||||
return staticRetryPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the retry policy to be used for Abfs Rest Operation Failure.
|
||||
* @param failureReason helps to decide which type of retryPolicy to be used.
|
||||
* @return retry policy to be used.
|
||||
*/
|
||||
public AbfsRetryPolicy getRetryPolicy(final String failureReason) {
|
||||
return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason)
|
||||
&& getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()
|
||||
? getStaticRetryPolicy()
|
||||
: getExponentialRetryPolicy();
|
||||
}
|
||||
|
||||
SharedKeyCredentials getSharedKeyCredentials() {
|
||||
|
@ -25,14 +25,18 @@
|
||||
public class AbfsClientContext {
|
||||
|
||||
private final ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private final StaticRetryPolicy staticRetryPolicy;
|
||||
private final AbfsPerfTracker abfsPerfTracker;
|
||||
private final AbfsCounters abfsCounters;
|
||||
|
||||
AbfsClientContext(
|
||||
ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
StaticRetryPolicy staticRetryPolicy,
|
||||
AbfsPerfTracker abfsPerfTracker,
|
||||
AbfsCounters abfsCounters) {
|
||||
this.exponentialRetryPolicy = exponentialRetryPolicy;
|
||||
|
||||
this.staticRetryPolicy = staticRetryPolicy;
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
this.abfsCounters = abfsCounters;
|
||||
}
|
||||
@ -41,6 +45,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() {
|
||||
return exponentialRetryPolicy;
|
||||
}
|
||||
|
||||
public StaticRetryPolicy getStaticRetryPolicy() {
|
||||
return staticRetryPolicy;
|
||||
}
|
||||
|
||||
public AbfsPerfTracker getAbfsPerfTracker() {
|
||||
return abfsPerfTracker;
|
||||
}
|
||||
|
@ -25,6 +25,7 @@
|
||||
public class AbfsClientContextBuilder {
|
||||
|
||||
private ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private StaticRetryPolicy staticRetryPolicy;
|
||||
private AbfsPerfTracker abfsPerfTracker;
|
||||
private AbfsCounters abfsCounters;
|
||||
|
||||
@ -34,6 +35,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy(
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsClientContextBuilder withStaticRetryPolicy(
|
||||
final StaticRetryPolicy staticRetryPolicy) {
|
||||
this.staticRetryPolicy = staticRetryPolicy;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsClientContextBuilder withAbfsPerfTracker(
|
||||
final AbfsPerfTracker abfsPerfTracker) {
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
@ -52,7 +59,10 @@ public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters
|
||||
*/
|
||||
public AbfsClientContext build() {
|
||||
//validate the values
|
||||
return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker,
|
||||
return new AbfsClientContext(
|
||||
exponentialRetryPolicy,
|
||||
staticRetryPolicy,
|
||||
abfsPerfTracker,
|
||||
abfsCounters);
|
||||
}
|
||||
}
|
||||
|
@ -55,9 +55,6 @@
|
||||
public class AbfsHttpOperation implements AbfsPerfLoggable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);
|
||||
|
||||
private static final int CONNECT_TIMEOUT = 30 * 1000;
|
||||
private static final int READ_TIMEOUT = 30 * 1000;
|
||||
|
||||
private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
|
||||
|
||||
private static final int ONE_THOUSAND = 1000;
|
||||
@ -263,10 +260,12 @@ public String getMaskedEncodedUrl() {
|
||||
* @param url The full URL including query string parameters.
|
||||
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
|
||||
* @param requestHeaders The HTTP request headers.READ_TIMEOUT
|
||||
*
|
||||
* @param connectionTimeout The Connection Timeout value to be used while establishing http connection
|
||||
* @param readTimeout The Read Timeout value to be used with http connection while making a request
|
||||
* @throws IOException if an error occurs.
|
||||
*/
|
||||
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
|
||||
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders,
|
||||
final int connectionTimeout, final int readTimeout)
|
||||
throws IOException {
|
||||
this.url = url;
|
||||
this.method = method;
|
||||
@ -280,9 +279,8 @@ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttp
|
||||
}
|
||||
}
|
||||
|
||||
this.connection.setConnectTimeout(CONNECT_TIMEOUT);
|
||||
this.connection.setReadTimeout(READ_TIMEOUT);
|
||||
|
||||
this.connection.setConnectTimeout(connectionTimeout);
|
||||
this.connection.setReadTimeout(readTimeout);
|
||||
this.connection.setRequestMethod(method);
|
||||
|
||||
for (AbfsHttpHeader header : requestHeaders) {
|
||||
|
@ -39,6 +39,7 @@
|
||||
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
|
||||
/**
|
||||
* The AbfsRestOperation for Rest AbfsClient.
|
||||
@ -81,6 +82,7 @@ public class AbfsRestOperation {
|
||||
* AbfsRestOperation object.
|
||||
*/
|
||||
private String failureReason;
|
||||
private AbfsRetryPolicy retryPolicy;
|
||||
|
||||
/**
|
||||
* This variable stores the tracing context used for last Rest Operation.
|
||||
@ -162,6 +164,7 @@ String getSasToken() {
|
||||
this.sasToken = sasToken;
|
||||
this.abfsCounters = client.getAbfsCounters();
|
||||
this.intercept = client.getIntercept();
|
||||
this.retryPolicy = client.getExponentialRetryPolicy();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -232,15 +235,18 @@ void completeExecute(TracingContext tracingContext)
|
||||
requestHeaders.add(httpHeader);
|
||||
}
|
||||
|
||||
// By Default Exponential Retry Policy Will be used
|
||||
retryCount = 0;
|
||||
retryPolicy = client.getExponentialRetryPolicy();
|
||||
LOG.debug("First execution of REST operation - {}", operationType);
|
||||
while (!executeHttpOperation(retryCount, tracingContext)) {
|
||||
try {
|
||||
++retryCount;
|
||||
tracingContext.setRetryCount(retryCount);
|
||||
LOG.debug("Retrying REST operation {}. RetryCount = {}",
|
||||
operationType, retryCount);
|
||||
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
||||
long retryInterval = retryPolicy.getRetryInterval(retryCount);
|
||||
LOG.debug("Rest operation {} failed with failureReason: {}. Retrying with retryCount = {}, retryPolicy: {} and sleepInterval: {}",
|
||||
operationType, failureReason, retryCount, retryPolicy.getAbbreviation(), retryInterval);
|
||||
Thread.sleep(retryInterval);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
@ -277,12 +283,13 @@ String getClientLatency() {
|
||||
private boolean executeHttpOperation(final int retryCount,
|
||||
TracingContext tracingContext) throws AzureBlobFileSystemException {
|
||||
AbfsHttpOperation httpOperation;
|
||||
boolean wasIOExceptionThrown = false;
|
||||
|
||||
try {
|
||||
// initialize the HTTP request and open the connection
|
||||
httpOperation = createHttpOperation();
|
||||
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
|
||||
tracingContext.constructHeader(httpOperation, failureReason);
|
||||
tracingContext.constructHeader(httpOperation, failureReason, retryPolicy.getAbbreviation());
|
||||
|
||||
signRequest(httpOperation, hasRequestBody ? bufferLength : 0);
|
||||
|
||||
@ -318,9 +325,10 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
String hostname = null;
|
||||
hostname = httpOperation.getHost();
|
||||
failureReason = RetryReason.getAbbreviation(ex, null, null);
|
||||
retryPolicy = client.getRetryPolicy(failureReason);
|
||||
LOG.warn("Unknown host name: {}. Retrying to resolve the host name...",
|
||||
hostname);
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
if (!retryPolicy.shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
return false;
|
||||
@ -330,8 +338,9 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
}
|
||||
|
||||
failureReason = RetryReason.getAbbreviation(ex, -1, "");
|
||||
|
||||
if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) {
|
||||
retryPolicy = client.getRetryPolicy(failureReason);
|
||||
wasIOExceptionThrown = true;
|
||||
if (!retryPolicy.shouldRetry(retryCount, -1)) {
|
||||
throw new InvalidAbfsRestOperationException(ex, retryCount);
|
||||
}
|
||||
|
||||
@ -339,25 +348,37 @@ private boolean executeHttpOperation(final int retryCount,
|
||||
} finally {
|
||||
int status = httpOperation.getStatusCode();
|
||||
/*
|
||||
A status less than 300 (2xx range) or greater than or equal
|
||||
to 500 (5xx range) should contribute to throttling metrics being updated.
|
||||
Less than 200 or greater than or equal to 500 show failed operations. 2xx
|
||||
range contributes to successful operations. 3xx range is for redirects
|
||||
and 4xx range is for user errors. These should not be a part of
|
||||
throttling backoff computation.
|
||||
A status less than 300 (2xx range) or greater than or equal
|
||||
to 500 (5xx range) should contribute to throttling metrics being updated.
|
||||
Less than 200 or greater than or equal to 500 show failed operations. 2xx
|
||||
range contributes to successful operations. 3xx range is for redirects
|
||||
and 4xx range is for user errors. These should not be a part of
|
||||
throttling backoff computation.
|
||||
*/
|
||||
boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
|
||||
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
|
||||
if (updateMetricsResponseCode) {
|
||||
|
||||
/*
|
||||
Connection Timeout failures should not contribute to throttling
|
||||
In case the current request fails with Connection Timeout we will have
|
||||
ioExceptionThrown true and failure reason as CT
|
||||
In case the current request failed with 5xx, failure reason will be
|
||||
updated after finally block but wasIOExceptionThrown will be false;
|
||||
*/
|
||||
boolean isCTFailure = CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown;
|
||||
|
||||
if (updateMetricsResponseCode && !isCTFailure) {
|
||||
intercept.updateMetrics(operationType, httpOperation);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
|
||||
|
||||
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
||||
int status = httpOperation.getStatusCode();
|
||||
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
|
||||
int status = httpOperation.getStatusCode();
|
||||
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
|
||||
retryPolicy = client.getRetryPolicy(failureReason);
|
||||
|
||||
if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -398,12 +419,16 @@ public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates new object of {@link AbfsHttpOperation} with the url, method, and
|
||||
* requestHeaders fields of the AbfsRestOperation object.
|
||||
* Creates new object of {@link AbfsHttpOperation} with the url, method, requestHeader fields and
|
||||
* timeout values as set in configuration of the AbfsRestOperation object.
|
||||
*
|
||||
* @return {@link AbfsHttpOperation} to be used for sending requests
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AbfsHttpOperation createHttpOperation() throws IOException {
|
||||
return new AbfsHttpOperation(url, method, requestHeaders);
|
||||
return new AbfsHttpOperation(url, method, requestHeaders,
|
||||
client.getAbfsConfiguration().getHttpConnectionTimeout(),
|
||||
client.getAbfsConfiguration().getHttpReadTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,98 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.net.HttpURLConnection;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* Abstract Class for Retry policy to be used by {@link AbfsClient}
|
||||
* Implementation to be used is based on retry cause.
|
||||
*/
|
||||
public abstract class AbfsRetryPolicy {
|
||||
|
||||
/**
|
||||
* The maximum number of retry attempts.
|
||||
*/
|
||||
private final int maxRetryCount;
|
||||
|
||||
/**
|
||||
* Retry Policy Abbreviation for logging purpose.
|
||||
*/
|
||||
private final String retryPolicyAbbreviation;
|
||||
|
||||
protected AbfsRetryPolicy(final int maxRetryCount, final String retryPolicyAbbreviation) {
|
||||
this.maxRetryCount = maxRetryCount;
|
||||
this.retryPolicyAbbreviation = retryPolicyAbbreviation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if a request should be retried based on the retry count, current response,
|
||||
* and the current strategy. The valid http status code lies in the range of 1xx-5xx.
|
||||
* But an invalid status code might be set due to network or timeout kind of issues.
|
||||
* Such invalid status code also qualify for retry.
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @param statusCode The status code of the response, or -1 for socket error.
|
||||
* @return true if the request should be retried; false otherwise.
|
||||
*/
|
||||
public boolean shouldRetry(final int retryCount, final int statusCode) {
|
||||
return retryCount < maxRetryCount
|
||||
&& (statusCode < HTTP_CONTINUE
|
||||
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|
||||
|| (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||
&& statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
|
||||
&& statusCode != HttpURLConnection.HTTP_VERSION));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns backoff interval to be used for a particular retry count
|
||||
* Child class should define how they want to calculate retry interval
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @return backoff Interval time
|
||||
*/
|
||||
public abstract long getRetryInterval(int retryCount);
|
||||
|
||||
/**
|
||||
* Returns a String value of the abbreviation
|
||||
* denoting which type of retry policy is used
|
||||
* @return retry policy abbreviation
|
||||
*/
|
||||
public String getAbbreviation() {
|
||||
return retryPolicyAbbreviation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns maximum number of retries allowed in this retry policy
|
||||
* @return max retry count
|
||||
*/
|
||||
protected int getMaxRetryCount() {
|
||||
return maxRetryCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "AbfsRetryPolicy of subtype: "
|
||||
+ retryPolicyAbbreviation
|
||||
+ " and max retry count: "
|
||||
+ maxRetryCount;
|
||||
}
|
||||
}
|
@ -19,17 +19,14 @@
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.Random;
|
||||
import java.net.HttpURLConnection;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
|
||||
|
||||
/**
|
||||
* Retry policy used by AbfsClient.
|
||||
* */
|
||||
public class ExponentialRetryPolicy {
|
||||
public class ExponentialRetryPolicy extends AbfsRetryPolicy {
|
||||
/**
|
||||
* Represents the default amount of time used when calculating a random delta in the exponential
|
||||
* delay between retries.
|
||||
@ -78,11 +75,6 @@ public class ExponentialRetryPolicy {
|
||||
*/
|
||||
private final int minBackoff;
|
||||
|
||||
/**
|
||||
* The maximum number of retry attempts.
|
||||
*/
|
||||
private final int retryCount;
|
||||
|
||||
/**
|
||||
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
||||
*/
|
||||
@ -105,38 +97,19 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) {
|
||||
/**
|
||||
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
||||
*
|
||||
* @param retryCount The maximum number of retry attempts.
|
||||
* @param maxRetryCount The maximum number of retry attempts.
|
||||
* @param minBackoff The minimum backoff time.
|
||||
* @param maxBackoff The maximum backoff time.
|
||||
* @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay
|
||||
* between retries.
|
||||
*/
|
||||
public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
|
||||
this.retryCount = retryCount;
|
||||
public ExponentialRetryPolicy(final int maxRetryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) {
|
||||
super(maxRetryCount, RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
this.minBackoff = minBackoff;
|
||||
this.maxBackoff = maxBackoff;
|
||||
this.deltaBackoff = deltaBackoff;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns if a request should be retried based on the retry count, current response,
|
||||
* and the current strategy. The valid http status code lies in the range of 1xx-5xx.
|
||||
* But an invalid status code might be set due to network or timeout kind of issues.
|
||||
* Such invalid status code also qualify for retry.
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @param statusCode The status code of the response, or -1 for socket error.
|
||||
* @return true if the request should be retried; false otherwise.
|
||||
*/
|
||||
public boolean shouldRetry(final int retryCount, final int statusCode) {
|
||||
return retryCount < this.retryCount
|
||||
&& (statusCode < HTTP_CONTINUE
|
||||
|| statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT
|
||||
|| (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR
|
||||
&& statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED
|
||||
&& statusCode != HttpURLConnection.HTTP_VERSION));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns backoff interval between 80% and 120% of the desired backoff,
|
||||
* multiply by 2^n-1 for exponential.
|
||||
@ -144,6 +117,7 @@ public boolean shouldRetry(final int retryCount, final int statusCode) {
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @return backoff Interval time
|
||||
*/
|
||||
@Override
|
||||
public long getRetryInterval(final int retryCount) {
|
||||
final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO)
|
||||
+ this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO)
|
||||
@ -151,16 +125,12 @@ public long getRetryInterval(final int retryCount) {
|
||||
|
||||
final double incrementDelta = (Math.pow(2, retryCount - 1)) * boundedRandDelta;
|
||||
|
||||
final long retryInterval = (int) Math.round(Math.min(this.minBackoff + incrementDelta, maxBackoff));
|
||||
final long retryInterval = (int) Math.round(Math.min(
|
||||
this.minBackoff + incrementDelta, maxBackoff));
|
||||
|
||||
return retryInterval;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getRetryCount() {
|
||||
return this.retryCount;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getMinBackoff() {
|
||||
return this.minBackoff;
|
||||
|
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
public final class RetryPolicyConstants {
|
||||
|
||||
private RetryPolicyConstants() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Constant for Exponential Retry Policy Abbreviation. {@value}
|
||||
*/
|
||||
public static final String EXPONENTIAL_RETRY_POLICY_ABBREVIATION= "E";
|
||||
/**
|
||||
* Constant for Static Retry Policy Abbreviation. {@value}
|
||||
*/
|
||||
public static final String STATIC_RETRY_POLICY_ABBREVIATION = "S";
|
||||
}
|
@ -0,0 +1,52 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
|
||||
/**
|
||||
* Retry policy used by AbfsClient for Network Errors.
|
||||
* */
|
||||
public class StaticRetryPolicy extends AbfsRetryPolicy {
|
||||
|
||||
/**
|
||||
* Represents the constant retry interval to be used with Static Retry Policy
|
||||
*/
|
||||
private final int retryInterval;
|
||||
|
||||
/**
|
||||
* Initializes a new instance of the {@link StaticRetryPolicy} class.
|
||||
* @param conf The {@link AbfsConfiguration} from which to retrieve retry configuration.
|
||||
*/
|
||||
public StaticRetryPolicy(AbfsConfiguration conf) {
|
||||
super(conf.getMaxIoRetries(), RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
this.retryInterval = conf.getStaticRetryInterval();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a constant backoff interval independent of retry count;
|
||||
*
|
||||
* @param retryCount The current retry attempt count.
|
||||
* @return backoff Interval time
|
||||
*/
|
||||
@Override
|
||||
public long getRetryInterval(final int retryCount) {
|
||||
return retryInterval;
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
|
||||
/**
|
||||
* The TracingContext class to correlate Store requests using unique
|
||||
@ -66,7 +67,7 @@ public class TracingContext {
|
||||
/**
|
||||
* If {@link #primaryRequestId} is null, this field shall be set equal
|
||||
* to the last part of the {@link #clientRequestId}'s UUID
|
||||
* in {@link #constructHeader(AbfsHttpOperation, String)} only on the
|
||||
* in {@link #constructHeader(AbfsHttpOperation, String, String)} only on the
|
||||
* first API call for an operation. Subsequent retries for that operation
|
||||
* will not change this field. In case {@link #primaryRequestId} is non-null,
|
||||
* this field shall not be set.
|
||||
@ -168,8 +169,10 @@ public void setListener(Listener listener) {
|
||||
* connection
|
||||
* @param previousFailure Failure seen before this API trigger on same operation
|
||||
* from AbfsClient.
|
||||
* @param retryPolicyAbbreviation Retry policy used to get retry interval before this
|
||||
* API trigger on same operation from AbfsClient
|
||||
*/
|
||||
public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure) {
|
||||
public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure, String retryPolicyAbbreviation) {
|
||||
clientRequestId = UUID.randomUUID().toString();
|
||||
switch (format) {
|
||||
case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty
|
||||
@ -177,7 +180,7 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
|
||||
clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":"
|
||||
+ getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID
|
||||
+ ":" + opType + ":" + retryCount;
|
||||
header = addFailureReasons(header, previousFailure);
|
||||
header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation);
|
||||
break;
|
||||
case TWO_ID_FORMAT:
|
||||
header = clientCorrelationID + ":" + clientRequestId;
|
||||
@ -217,10 +220,13 @@ private String getPrimaryRequestIdForHeader(final Boolean isRetry) {
|
||||
}
|
||||
|
||||
private String addFailureReasons(final String header,
|
||||
final String previousFailure) {
|
||||
final String previousFailure, String retryPolicyAbbreviation) {
|
||||
if (previousFailure == null) {
|
||||
return header;
|
||||
}
|
||||
if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && retryPolicyAbbreviation != null) {
|
||||
return String.format("%s_%s_%s", header, previousFailure, retryPolicyAbbreviation);
|
||||
}
|
||||
return String.format("%s_%s", header, previousFailure);
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,9 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -32,6 +35,9 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_READ_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
|
||||
@ -45,6 +51,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
||||
private static final int TEST_OFFSET = 100;
|
||||
private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
||||
private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900;
|
||||
private static final int TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS = 500;
|
||||
private static final int TEST_STABLE_DEFAULT_READ_TIMEOUT_MS = 30000;
|
||||
private static final int TEST_UNSTABLE_READ_TIMEOUT_MS = 1;
|
||||
|
||||
public ITestAzureBlobFileSystemE2E() throws Exception {
|
||||
super();
|
||||
@ -229,4 +238,42 @@ private void testWriteOneByteToFile(Path testFilePath) throws Exception {
|
||||
FileStatus fileStatus = fs.getFileStatus(testFilePath);
|
||||
assertEquals(1, fileStatus.getLen());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpConnectionTimeout() throws Exception {
|
||||
// Not seeing connection failures while testing with 1 ms connection
|
||||
// timeout itself and on repeated TPCDS runs when cluster
|
||||
// and account are in same region, 10 ms is seen stable.
|
||||
// 500 ms is seen stable for cross region.
|
||||
testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
|
||||
TEST_STABLE_DEFAULT_READ_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
@Test(expected = InvalidAbfsRestOperationException.class)
|
||||
public void testHttpReadTimeout() throws Exception {
|
||||
// Small read timeout is bound to make the request fail.
|
||||
testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
|
||||
TEST_UNSTABLE_READ_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
public void testHttpTimeouts(int connectionTimeoutMs, int readTimeoutMs)
|
||||
throws Exception {
|
||||
Configuration conf = this.getRawConfiguration();
|
||||
// set to small values that will cause timeouts
|
||||
conf.setInt(AZURE_HTTP_CONNECTION_TIMEOUT, connectionTimeoutMs);
|
||||
conf.setInt(AZURE_HTTP_READ_TIMEOUT, readTimeoutMs);
|
||||
// Reduce retry count to reduce test run time
|
||||
conf.setInt(AZURE_MAX_IO_RETRIES, 1);
|
||||
final AzureBlobFileSystem fs = getFileSystem(conf);
|
||||
Assertions.assertThat(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getHttpConnectionTimeout())
|
||||
.describedAs("HTTP connection time should be picked from config")
|
||||
.isEqualTo(connectionTimeoutMs);
|
||||
Assertions.assertThat(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getHttpReadTimeout())
|
||||
.describedAs("HTTP Read time should be picked from config")
|
||||
.isEqualTo(readTimeoutMs);
|
||||
Path testPath = path(methodName.getMethodName());
|
||||
ContractTestUtils.createFile(fs, testPath, false, new byte[0]);
|
||||
}
|
||||
}
|
||||
|
@ -178,7 +178,7 @@ public void testListPathTracingContext() throws Exception {
|
||||
TEST_CONTINUATION_TOKEN, spiedTracingContext);
|
||||
|
||||
// Assert that none of the API calls used the same tracing header.
|
||||
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any());
|
||||
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -34,21 +34,11 @@
|
||||
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
@ -118,15 +108,15 @@ public void testValidateFunctionsInConfigServiceImpl() throws Exception {
|
||||
for (Field field : fields) {
|
||||
field.setAccessible(true);
|
||||
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
|
||||
assertEquals(TEST_INT, abfsConfiguration.validateInt(field));
|
||||
Assertions.assertThat(abfsConfiguration.validateInt(field)).isEqualTo(TEST_INT);
|
||||
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
|
||||
assertEquals(DEFAULT_LONG, abfsConfiguration.validateLong(field));
|
||||
Assertions.assertThat(abfsConfiguration.validateLong(field)).isEqualTo(DEFAULT_LONG);
|
||||
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
|
||||
assertEquals("stringValue", abfsConfiguration.validateString(field));
|
||||
Assertions.assertThat(abfsConfiguration.validateString(field)).isEqualTo("stringValue");
|
||||
} else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) {
|
||||
assertEquals(this.encodedString, abfsConfiguration.validateBase64String(field));
|
||||
Assertions.assertThat(abfsConfiguration.validateBase64String(field)).isEqualTo(this.encodedString);
|
||||
} else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) {
|
||||
assertEquals(true, abfsConfiguration.validateBoolean(field));
|
||||
Assertions.assertThat(abfsConfiguration.validateBoolean(field)).isEqualTo(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -134,27 +124,54 @@ public void testValidateFunctionsInConfigServiceImpl() throws Exception {
|
||||
@Test
|
||||
public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception {
|
||||
// test that all the ConfigurationServiceImpl annotated fields have been initialized in the constructor
|
||||
assertEquals(DEFAULT_WRITE_BUFFER_SIZE, abfsConfiguration.getWriteBufferSize());
|
||||
assertEquals(DEFAULT_READ_BUFFER_SIZE, abfsConfiguration.getReadBufferSize());
|
||||
assertEquals(DEFAULT_MIN_BACKOFF_INTERVAL, abfsConfiguration.getMinBackoffIntervalMilliseconds());
|
||||
assertEquals(DEFAULT_MAX_BACKOFF_INTERVAL, abfsConfiguration.getMaxBackoffIntervalMilliseconds());
|
||||
assertEquals(DEFAULT_BACKOFF_INTERVAL, abfsConfiguration.getBackoffIntervalMilliseconds());
|
||||
assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries());
|
||||
assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
|
||||
assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost());
|
||||
assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange());
|
||||
Assertions.assertThat(abfsConfiguration.getWriteBufferSize())
|
||||
.describedAs("Default value of write buffer size should be initialized")
|
||||
.isEqualTo(DEFAULT_WRITE_BUFFER_SIZE);
|
||||
Assertions.assertThat(abfsConfiguration.getReadBufferSize())
|
||||
.describedAs("Default value of read buffer size should be initialized")
|
||||
.isEqualTo(DEFAULT_READ_BUFFER_SIZE);
|
||||
Assertions.assertThat(abfsConfiguration.getMinBackoffIntervalMilliseconds())
|
||||
.describedAs("Default value of min backoff interval should be initialized")
|
||||
.isEqualTo(DEFAULT_MIN_BACKOFF_INTERVAL);
|
||||
Assertions.assertThat(abfsConfiguration.getMaxBackoffIntervalMilliseconds())
|
||||
.describedAs("Default value of max backoff interval should be initialized")
|
||||
.isEqualTo(DEFAULT_MAX_BACKOFF_INTERVAL);
|
||||
Assertions.assertThat(abfsConfiguration.getBackoffIntervalMilliseconds())
|
||||
.describedAs("Default value of backoff interval should be initialized")
|
||||
.isEqualTo(DEFAULT_BACKOFF_INTERVAL);
|
||||
Assertions.assertThat(abfsConfiguration.getMaxIoRetries())
|
||||
.describedAs("Default value of max number of retries should be initialized")
|
||||
.isEqualTo(DEFAULT_MAX_RETRY_ATTEMPTS);
|
||||
Assertions.assertThat(abfsConfiguration.getAzureBlockSize())
|
||||
.describedAs("Default value of azure block size should be initialized")
|
||||
.isEqualTo(MAX_AZURE_BLOCK_SIZE);
|
||||
Assertions.assertThat(abfsConfiguration.getAzureBlockLocationHost())
|
||||
.describedAs("Default value of azure block location host should be initialized")
|
||||
.isEqualTo(AZURE_BLOCK_LOCATION_HOST_DEFAULT);
|
||||
Assertions.assertThat(abfsConfiguration.getReadAheadRange())
|
||||
.describedAs("Default value of read ahead range should be initialized")
|
||||
.isEqualTo(DEFAULT_READ_AHEAD_RANGE);
|
||||
Assertions.assertThat(abfsConfiguration.getHttpConnectionTimeout())
|
||||
.describedAs("Default value of http connection timeout should be initialized")
|
||||
.isEqualTo(DEFAULT_HTTP_CONNECTION_TIMEOUT);
|
||||
Assertions.assertThat(abfsConfiguration.getHttpReadTimeout())
|
||||
.describedAs("Default value of http read timeout should be initialized")
|
||||
.isEqualTo(DEFAULT_HTTP_READ_TIMEOUT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigBlockSizeInitialized() throws Exception {
|
||||
// test the block size annotated field has been initialized in the constructor
|
||||
assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
|
||||
Assertions.assertThat(abfsConfiguration.getAzureBlockSize())
|
||||
.describedAs("Default value of max azure block size should be initialized")
|
||||
.isEqualTo(MAX_AZURE_BLOCK_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAccountKey() throws Exception {
|
||||
String accountKey = abfsConfiguration.getStorageAccountKey();
|
||||
assertEquals(this.encodedAccountKey, accountKey);
|
||||
Assertions.assertThat(accountKey).describedAs("Account Key should be initialized in configs")
|
||||
.isEqualTo(this.encodedAccountKey);
|
||||
}
|
||||
|
||||
@Test(expected = KeyProviderException.class)
|
||||
@ -169,19 +186,28 @@ public void testGetAccountKeyWithNonExistingAccountName() throws Exception {
|
||||
@Test
|
||||
public void testSSLSocketFactoryConfiguration()
|
||||
throws InvalidConfigurationValueException, IllegalAccessException, IOException {
|
||||
assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
|
||||
Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
|
||||
.describedAs("By default SSL Channel Mode should be Default")
|
||||
.isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default);
|
||||
Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
|
||||
.describedAs("By default SSL Channel Mode should be Default")
|
||||
.isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
|
||||
Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption())
|
||||
.describedAs("By default SSL Channel Mode should be Default")
|
||||
.isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
|
||||
AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||
Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption())
|
||||
.describedAs("SSL Channel Mode should be Default_JSSE as set")
|
||||
.isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE);
|
||||
|
||||
configuration = new Configuration();
|
||||
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
|
||||
localAbfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||
Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption())
|
||||
.describedAs("SSL Channel Mode should be OpenSSL as set")
|
||||
.isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL);
|
||||
}
|
||||
|
||||
public static AbfsConfiguration updateRetryConfigs(AbfsConfiguration abfsConfig,
|
||||
|
@ -51,6 +51,10 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
|
||||
|
||||
public class TestTracingContext extends AbstractAbfsIntegrationTest {
|
||||
private static final String[] CLIENT_CORRELATIONID_LIST = {
|
||||
@ -213,7 +217,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
|
||||
0));
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class);
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
|
||||
tracingContext.constructHeader(abfsHttpOperation, null);
|
||||
tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
String header = tracingContext.getHeader();
|
||||
String clientRequestIdUsed = header.split(":")[1];
|
||||
String[] clientRequestIdUsedParts = clientRequestIdUsed.split("-");
|
||||
@ -225,7 +229,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
|
||||
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
|
||||
1));
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, "RT");
|
||||
tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
header = tracingContext.getHeader();
|
||||
String primaryRequestId = header.split(":")[3];
|
||||
|
||||
@ -250,7 +254,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
|
||||
tracingContext.setPrimaryRequestID();
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class);
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
|
||||
tracingContext.constructHeader(abfsHttpOperation, null);
|
||||
tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
String header = tracingContext.getHeader();
|
||||
String assertionPrimaryId = header.split(":")[3];
|
||||
|
||||
@ -260,7 +264,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
|
||||
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
|
||||
1));
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, "RT");
|
||||
tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
header = tracingContext.getHeader();
|
||||
String primaryRequestId = header.split(":")[3];
|
||||
|
||||
@ -269,4 +273,69 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin
|
||||
+ "should be equal to PrimaryRequestId in the original request.")
|
||||
.isEqualTo(assertionPrimaryId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTracingContextHeaderForRetrypolicy() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final String fileSystemId = fs.getFileSystemId();
|
||||
final String clientCorrelationId = fs.getClientCorrelationId();
|
||||
final TracingHeaderFormat tracingHeaderFormat = TracingHeaderFormat.ALL_ID_FORMAT;
|
||||
TracingContext tracingContext = new TracingContext(clientCorrelationId,
|
||||
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new TracingHeaderValidator(
|
||||
fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(),
|
||||
fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false,
|
||||
0));
|
||||
tracingContext.setPrimaryRequestID();
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class);
|
||||
Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString());
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, null, null);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, null, STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, null);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, "503", null);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, "503", STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null);
|
||||
|
||||
tracingContext.constructHeader(abfsHttpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null);
|
||||
}
|
||||
|
||||
private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) {
|
||||
String[] headerContents = header.split(":");
|
||||
String previousReqContext = headerContents[6];
|
||||
|
||||
if (expectedFailureReason != null) {
|
||||
Assertions.assertThat(previousReqContext.split("_")[1]).describedAs(
|
||||
"Failure reason Is not as expected").isEqualTo(expectedFailureReason);
|
||||
if (expectedRetryPolicyAbbreviation != null) {
|
||||
Assertions.assertThat(previousReqContext.split("_")).describedAs(
|
||||
"Retry Count, Failure Reason and Retry Policy should be present").hasSize(3);
|
||||
Assertions.assertThat(previousReqContext.split("_")[2]).describedAs(
|
||||
"Retry policy is not as expected").isEqualTo(expectedRetryPolicyAbbreviation);
|
||||
} else {
|
||||
Assertions.assertThat(previousReqContext.split("_")).describedAs(
|
||||
"Retry Count and Failure Reason should be present").hasSize(2);
|
||||
}
|
||||
} else {
|
||||
Assertions.assertThat(previousReqContext.split("_")).describedAs(
|
||||
"Only Retry Count should be present").hasSize(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,9 @@
|
||||
import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
|
||||
|
||||
/**
|
||||
* A mock SAS token provider implementation
|
||||
*/
|
||||
@ -103,7 +106,8 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app
|
||||
requestBody.append(ske);
|
||||
requestBody.append("</Expiry></KeyInfo>");
|
||||
|
||||
AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders);
|
||||
AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders,
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString());
|
||||
op.sendRequest(requestBuffer, 0, requestBuffer.length);
|
||||
|
@ -26,7 +26,9 @@
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.mockito.AdditionalMatchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
@ -35,8 +37,12 @@
|
||||
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
@ -46,6 +52,7 @@
|
||||
* objects which are protected inside services package.
|
||||
*/
|
||||
public final class AbfsClientTestUtil {
|
||||
private static final long ONE_SEC = 1000;
|
||||
|
||||
private AbfsClientTestUtil() {
|
||||
|
||||
@ -55,7 +62,9 @@ public static void setMockAbfsRestOperationForListPathOperation(
|
||||
final AbfsClient spiedClient,
|
||||
FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> functionRaisingIOE)
|
||||
throws Exception {
|
||||
ExponentialRetryPolicy retryPolicy = Mockito.mock(ExponentialRetryPolicy.class);
|
||||
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(ExponentialRetryPolicy.class);
|
||||
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
|
||||
AbfsThrottlingIntercept intercept = Mockito.mock(AbfsThrottlingIntercept.class);
|
||||
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
|
||||
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.ListPaths,
|
||||
@ -68,7 +77,7 @@ public static void setMockAbfsRestOperationForListPathOperation(
|
||||
Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation(
|
||||
eq(AbfsRestOperationType.ListPaths), any(), any(), any());
|
||||
|
||||
addGeneralMockBehaviourToAbfsClient(spiedClient, retryPolicy);
|
||||
addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
|
||||
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
|
||||
|
||||
functionRaisingIOE.apply(httpOperation);
|
||||
@ -96,28 +105,54 @@ public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperat
|
||||
* Adding general mock behaviour to AbfsClient to avoid any NPE occurring.
|
||||
* These will avoid any network call made and will return the relevant exception or return value directly.
|
||||
* @param abfsClient to be mocked
|
||||
* @param retryPolicy to be mocked
|
||||
* @param exponentialRetryPolicy
|
||||
* @param staticRetryPolicy
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient,
|
||||
final ExponentialRetryPolicy retryPolicy) throws IOException {
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final StaticRetryPolicy staticRetryPolicy,
|
||||
final AbfsThrottlingIntercept intercept) throws IOException {
|
||||
Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
|
||||
Mockito.doReturn("").when(abfsClient).getAccessToken();
|
||||
AbfsThrottlingIntercept intercept = Mockito.mock(
|
||||
AbfsThrottlingIntercept.class);
|
||||
|
||||
Mockito.doReturn(intercept).when(abfsClient).getIntercept();
|
||||
Mockito.doNothing()
|
||||
.when(intercept)
|
||||
.sendingRequest(any(), nullable(AbfsCounters.class));
|
||||
Mockito.doNothing().when(intercept).updateMetrics(any(), any());
|
||||
|
||||
Mockito.doReturn(retryPolicy).when(abfsClient).getRetryPolicy();
|
||||
Mockito.doReturn(true)
|
||||
.when(retryPolicy)
|
||||
.shouldRetry(nullable(Integer.class), nullable(Integer.class));
|
||||
Mockito.doReturn(false).when(retryPolicy).shouldRetry(0, HTTP_OK);
|
||||
Mockito.doReturn(false).when(retryPolicy).shouldRetry(1, HTTP_OK);
|
||||
Mockito.doReturn(false).when(retryPolicy).shouldRetry(2, HTTP_OK);
|
||||
// Returning correct retry policy based on failure reason
|
||||
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getExponentialRetryPolicy();
|
||||
Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
||||
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy(
|
||||
AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION)));
|
||||
|
||||
// Defining behavior of static retry policy
|
||||
Mockito.doReturn(true).when(staticRetryPolicy)
|
||||
.shouldRetry(nullable(Integer.class), nullable(Integer.class));
|
||||
Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(1, HTTP_OK);
|
||||
Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, HTTP_OK);
|
||||
Mockito.doReturn(true).when(staticRetryPolicy).shouldRetry(1, HTTP_UNAVAILABLE);
|
||||
// We want only two retries to occcur
|
||||
Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, HTTP_UNAVAILABLE);
|
||||
Mockito.doReturn(STATIC_RETRY_POLICY_ABBREVIATION).when(staticRetryPolicy).getAbbreviation();
|
||||
Mockito.doReturn(ONE_SEC).when(staticRetryPolicy).getRetryInterval(nullable(Integer.class));
|
||||
|
||||
// Defining behavior of exponential retry policy
|
||||
Mockito.doReturn(true).when(exponentialRetryPolicy)
|
||||
.shouldRetry(nullable(Integer.class), nullable(Integer.class));
|
||||
Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(1, HTTP_OK);
|
||||
Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, HTTP_OK);
|
||||
Mockito.doReturn(true).when(exponentialRetryPolicy).shouldRetry(1, HTTP_UNAVAILABLE);
|
||||
// We want only two retries to occcur
|
||||
Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, HTTP_UNAVAILABLE);
|
||||
Mockito.doReturn(EXPONENTIAL_RETRY_POLICY_ABBREVIATION).when(exponentialRetryPolicy).getAbbreviation();
|
||||
Mockito.doReturn(2 * ONE_SEC).when(exponentialRetryPolicy).getRetryInterval(nullable(Integer.class));
|
||||
|
||||
AbfsConfiguration configurations = Mockito.mock(AbfsConfiguration.class);
|
||||
Mockito.doReturn(configurations).when(abfsClient).getAbfsConfiguration();
|
||||
Mockito.doReturn(true).when(configurations).getStaticRetryForConnectionTimeoutEnabled();
|
||||
}
|
||||
|
||||
public static void hookOnRestOpsForTracingContextSingularity(AbfsClient client) {
|
||||
|
@ -52,6 +52,7 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
@ -77,7 +78,6 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_VALUE_UNKNOWN;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
|
||||
|
||||
/**
|
||||
@ -365,7 +365,9 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
|
||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||
when(client.getAuthType()).thenReturn(currentAuthType);
|
||||
when(client.getRetryPolicy()).thenReturn(
|
||||
when(client.getExponentialRetryPolicy()).thenReturn(
|
||||
new ExponentialRetryPolicy(1));
|
||||
when(client.getRetryPolicy(any())).thenReturn(
|
||||
new ExponentialRetryPolicy(1));
|
||||
|
||||
when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
|
||||
@ -560,7 +562,7 @@ public void testExpectHundredContinue() throws Exception {
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url,
|
||||
HTTP_METHOD_PUT, requestHeaders));
|
||||
HTTP_METHOD_PUT, requestHeaders, DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (appendRequestParameters.isExpectHeaderEnabled()) {
|
||||
|
@ -53,6 +53,8 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION;
|
||||
@ -202,7 +204,8 @@ private AbfsRestOperation getRestOperation() throws Exception {
|
||||
appendRequestParameters.getoffset(),
|
||||
appendRequestParameters.getLength(), null));
|
||||
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders));
|
||||
AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders,
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT));
|
||||
|
||||
// Sets the expect request property if expect header is enabled.
|
||||
if (expectHeaderEnabled) {
|
||||
|
@ -47,7 +47,6 @@
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -56,9 +55,9 @@
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
|
||||
/**
|
||||
* Unit test TestExponentialRetryPolicy.
|
||||
* Unit test ITestExponentialRetryPolicy.
|
||||
*/
|
||||
public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
public class ITestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
private final int maxRetryCount = 30;
|
||||
private final int noRetryCount = 0;
|
||||
private final int retryCount = new Random().nextInt(maxRetryCount);
|
||||
@ -68,7 +67,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
private static final int ANALYSIS_PERIOD = 10000;
|
||||
|
||||
|
||||
public TestExponentialRetryPolicy() throws Exception {
|
||||
public ITestExponentialRetryPolicy() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
@ -86,9 +85,10 @@ public void testDifferentMaxIORetryCount() throws Exception {
|
||||
@Test
|
||||
public void testDefaultMaxIORetryCount() throws Exception {
|
||||
AbfsConfiguration abfsConfig = getAbfsConfig();
|
||||
Assert.assertEquals(
|
||||
String.format("default maxIORetry count is %s.", maxRetryCount),
|
||||
maxRetryCount, abfsConfig.getMaxIoRetries());
|
||||
Assertions.assertThat(abfsConfig.getMaxIoRetries())
|
||||
.describedAs("Max retry count should be %s", maxRetryCount)
|
||||
.isEqualTo(maxRetryCount);
|
||||
|
||||
testMaxIOConfig(abfsConfig);
|
||||
}
|
||||
|
||||
@ -265,7 +265,7 @@ public void testAbfsConfigConstructor() throws Exception {
|
||||
ExponentialRetryPolicy template = new ExponentialRetryPolicy(
|
||||
getAbfsConfig().getMaxIoRetries());
|
||||
int testModifier = 1;
|
||||
int expectedMaxRetries = template.getRetryCount() + testModifier;
|
||||
int expectedMaxRetries = template.getMaxRetryCount() + testModifier;
|
||||
int expectedMinBackoff = template.getMinBackoff() + testModifier;
|
||||
int expectedMaxBackoff = template.getMaxBackoff() + testModifier;
|
||||
int expectedDeltaBackoff = template.getDeltaBackoff() + testModifier;
|
||||
@ -279,10 +279,18 @@ public void testAbfsConfigConstructor() throws Exception {
|
||||
ExponentialRetryPolicy policy = new ExponentialRetryPolicy(
|
||||
new AbfsConfiguration(config, "dummyAccountName"));
|
||||
|
||||
Assert.assertEquals("Max retry count was not set as expected.", expectedMaxRetries, policy.getRetryCount());
|
||||
Assert.assertEquals("Min backoff interval was not set as expected.", expectedMinBackoff, policy.getMinBackoff());
|
||||
Assert.assertEquals("Max backoff interval was not set as expected.", expectedMaxBackoff, policy.getMaxBackoff());
|
||||
Assert.assertEquals("Delta backoff interval was not set as expected.", expectedDeltaBackoff, policy.getDeltaBackoff());
|
||||
Assertions.assertThat(policy.getMaxRetryCount())
|
||||
.describedAs("Max retry count was not set as expected.")
|
||||
.isEqualTo(expectedMaxRetries);
|
||||
Assertions.assertThat(policy.getMinBackoff())
|
||||
.describedAs("Min backoff interval was not set as expected.")
|
||||
.isEqualTo(expectedMinBackoff);
|
||||
Assertions.assertThat(policy.getMaxBackoff())
|
||||
.describedAs("Max backoff interval was not set as expected")
|
||||
.isEqualTo(expectedMaxBackoff);
|
||||
Assertions.assertThat(policy.getDeltaBackoff())
|
||||
.describedAs("Delta backoff interval was not set as expected.")
|
||||
.isEqualTo(expectedDeltaBackoff);
|
||||
}
|
||||
|
||||
private AbfsConfiguration getAbfsConfig() throws Exception {
|
||||
@ -297,14 +305,14 @@ private void testMaxIOConfig(AbfsConfiguration abfsConfig) {
|
||||
int localRetryCount = 0;
|
||||
|
||||
while (localRetryCount < abfsConfig.getMaxIoRetries()) {
|
||||
Assert.assertTrue(
|
||||
"Retry should be allowed when retryCount less than max count configured.",
|
||||
retryPolicy.shouldRetry(localRetryCount, -1));
|
||||
Assertions.assertThat(retryPolicy.shouldRetry(localRetryCount, -1))
|
||||
.describedAs("Retry should be allowed when retryCount less than max count configured.")
|
||||
.isTrue();
|
||||
localRetryCount++;
|
||||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
"When all retries are exhausted, the retryCount will be same as max configured",
|
||||
abfsConfig.getMaxIoRetries(), localRetryCount);
|
||||
Assertions.assertThat(localRetryCount)
|
||||
.describedAs("When all retries are exhausted, the retryCount will be same as max configured.")
|
||||
.isEqualTo(abfsConfig.getMaxIoRetries());
|
||||
}
|
||||
}
|
@ -0,0 +1,142 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_INTERVAL;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
|
||||
/**
|
||||
* Class to test the behavior of Static Retry policy as well the inheritance
|
||||
* between {@link AbfsRetryPolicy}, {@link ExponentialRetryPolicy}, {@link StaticRetryPolicy}
|
||||
*/
|
||||
public class ITestStaticRetryPolicy extends AbstractAbfsIntegrationTest {
|
||||
|
||||
public ITestStaticRetryPolicy() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests for retry policy related configurations.
|
||||
* Asserting that the correct retry policy is used for a given set of
|
||||
* configurations including default ones
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testStaticRetryPolicyInitializationDefault() throws Exception {
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
assertInitialization(config, StaticRetryPolicy.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaticRetryPolicyInitialization1() throws Exception {
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true");
|
||||
assertInitialization(config, StaticRetryPolicy.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaticRetryPolicyInitialization2() throws Exception {
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "false");
|
||||
assertInitialization(config, ExponentialRetryPolicy.class);
|
||||
}
|
||||
|
||||
private void assertInitialization(Configuration config, Class retryPolicyClass) throws Exception{
|
||||
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
|
||||
.newInstance(getFileSystem().getUri(), config);
|
||||
AbfsClient client = fs.getAbfsStore().getClient();
|
||||
|
||||
// Assert that static retry policy will be used only for CT Failures
|
||||
AbfsRetryPolicy retryPolicy = client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
||||
Assertions.assertThat(retryPolicy)
|
||||
.describedAs("RetryPolicy Type is Not As Expected")
|
||||
.isInstanceOf(retryPolicyClass);
|
||||
|
||||
// For all other possible values of failureReason, Exponential retry is used
|
||||
retryPolicy = client.getRetryPolicy("");
|
||||
assertIsExponentialRetryPolicy(retryPolicy);
|
||||
retryPolicy = client.getRetryPolicy(null);
|
||||
assertIsExponentialRetryPolicy(retryPolicy);
|
||||
retryPolicy = client.getRetryPolicy(CONNECTION_RESET_ABBREVIATION);
|
||||
assertIsExponentialRetryPolicy(retryPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to assert that static retry policy returns the same retry interval
|
||||
* independent of retry count
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testStaticRetryInterval() throws Exception {
|
||||
Configuration config = new Configuration(this.getRawConfiguration());
|
||||
long retryInterval = 1000;
|
||||
int maxIoRetry = 5;
|
||||
config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true");
|
||||
config.set(AZURE_STATIC_RETRY_INTERVAL, "1000");
|
||||
config.set(AZURE_MAX_IO_RETRIES, "5");
|
||||
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
|
||||
.newInstance(getFileSystem().getUri(), config);
|
||||
AbfsClient client = fs.getAbfsStore().getClient();
|
||||
|
||||
AbfsRetryPolicy retryPolicy = client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
||||
assertIsStaticRetryPolicy(retryPolicy);
|
||||
|
||||
Assertions.assertThat(retryPolicy.shouldRetry(0, -1))
|
||||
.describedAs("Should retry should be true")
|
||||
.isEqualTo(true);
|
||||
Assertions.assertThat(retryPolicy.getRetryInterval(0))
|
||||
.describedAs("Retry Interval Value Not as expected")
|
||||
.isEqualTo(retryInterval);
|
||||
Assertions.assertThat(retryPolicy.getRetryInterval(1))
|
||||
.describedAs("Retry Interval Value Not as expected")
|
||||
.isEqualTo(retryInterval);
|
||||
Assertions.assertThat(retryPolicy.getRetryInterval(2))
|
||||
.describedAs("Retry Interval Value Not as expected")
|
||||
.isEqualTo(retryInterval);
|
||||
Assertions.assertThat(retryPolicy.getRetryInterval(3))
|
||||
.describedAs("Retry Interval Value Not as expected")
|
||||
.isEqualTo(retryInterval);
|
||||
Assertions.assertThat(retryPolicy.shouldRetry(maxIoRetry, -1))
|
||||
.describedAs("Should retry for maxretrycount should be false")
|
||||
.isEqualTo(false);
|
||||
}
|
||||
|
||||
private void assertIsExponentialRetryPolicy(AbfsRetryPolicy retryPolicy) {
|
||||
Assertions.assertThat(retryPolicy)
|
||||
.describedAs("Exponential Retry policy must be used")
|
||||
.isInstanceOf(ExponentialRetryPolicy.class);
|
||||
}
|
||||
|
||||
private void assertIsStaticRetryPolicy(AbfsRetryPolicy retryPolicy) {
|
||||
Assertions.assertThat(retryPolicy)
|
||||
.describedAs("Static Retry policy must be used")
|
||||
.isInstanceOf(StaticRetryPolicy.class);
|
||||
}
|
||||
}
|
@ -36,6 +36,8 @@
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
@ -74,7 +76,8 @@ public void verifyDisablingOfTracker() throws Exception {
|
||||
|
||||
try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller",
|
||||
"disablingCallee")) {
|
||||
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
tracker.registerResult(op).registerSuccess(true);
|
||||
}
|
||||
|
||||
@ -92,7 +95,8 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception {
|
||||
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
|
||||
|
||||
List<Callable<Integer>> tasks = new ArrayList<>();
|
||||
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -131,7 +135,8 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception {
|
||||
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();
|
||||
|
||||
List<Callable<Integer>> tasks = new ArrayList<>();
|
||||
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -170,7 +175,8 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio
|
||||
long aggregateLatency = 0;
|
||||
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
|
||||
List<Callable<Long>> tasks = new ArrayList<>();
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -205,7 +211,8 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio
|
||||
long aggregateLatency = 0;
|
||||
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
|
||||
List<Callable<Long>> tasks = new ArrayList<>();
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -269,7 +276,8 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception
|
||||
long aggregateLatency = 0;
|
||||
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
|
||||
List<Callable<Long>> tasks = new ArrayList<>();
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -303,7 +311,8 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception
|
||||
long aggregateLatency = 0;
|
||||
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
|
||||
List<Callable<Long>> tasks = new ArrayList<>();
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
tasks.add(() -> {
|
||||
@ -363,7 +372,8 @@ public void verifyNoExceptionOnInvalidInput() throws Exception {
|
||||
Instant testInstant = Instant.now();
|
||||
AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false);
|
||||
AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true);
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
verifyNoException(abfsPerfTrackerDisabled);
|
||||
verifyNoException(abfsPerfTrackerEnabled);
|
||||
@ -371,7 +381,8 @@ public void verifyNoExceptionOnInvalidInput() throws Exception {
|
||||
|
||||
private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception {
|
||||
Instant testInstant = Instant.now();
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
|
||||
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>(),
|
||||
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);
|
||||
|
||||
try (
|
||||
AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.time.Duration;
|
||||
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
|
||||
import org.apache.hadoop.fs.statistics.IOStatistics;
|
||||
import org.assertj.core.api.Assertions;
|
||||
@ -47,6 +48,8 @@
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND;
|
||||
@ -158,6 +161,10 @@ AbfsClient getMockAbfsClient() throws IOException {
|
||||
|
||||
// adding mock objects to current AbfsClient
|
||||
AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient());
|
||||
AbfsConfiguration spiedConf = Mockito.spy(fs.getAbfsStore().getAbfsConfiguration());
|
||||
Mockito.doReturn(DEFAULT_HTTP_CONNECTION_TIMEOUT).when(spiedConf).getHttpConnectionTimeout();
|
||||
Mockito.doReturn(DEFAULT_HTTP_READ_TIMEOUT).when(spiedConf).getHttpReadTimeout();
|
||||
Mockito.doReturn(spiedConf).when(spyClient).getAbfsConfiguration();
|
||||
|
||||
Mockito.doAnswer(answer -> {
|
||||
AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath,
|
||||
@ -191,9 +198,7 @@ private void addSpyBehavior(final AbfsRestOperation spiedRestOp,
|
||||
normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
|
||||
client.getAccessToken());
|
||||
|
||||
when(spiedRestOp.createHttpOperation())
|
||||
.thenReturn(failingOperation)
|
||||
.thenReturn(normalOp2);
|
||||
Mockito.doReturn(failingOperation).doReturn(normalOp2).when(spiedRestOp).createHttpOperation();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -29,6 +29,7 @@
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.stubbing.Stubber;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
||||
@ -37,8 +38,11 @@
|
||||
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
|
||||
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToRestOpAndHttpOp;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
@ -54,6 +58,8 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.nullable;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestAbfsRestOperationMockFailures {
|
||||
|
||||
@ -63,7 +69,7 @@ public void testClientRequestIdForConnectTimeoutRetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE);
|
||||
abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -75,7 +81,7 @@ public void testClientRequestIdForConnectAndReadTimeoutRetry()
|
||||
abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION;
|
||||
exceptions[1] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
|
||||
abbreviations[1] = READ_TIMEOUT_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -84,7 +90,7 @@ public void testClientRequestIdForReadTimeoutRetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE);
|
||||
abbreviations[0] = READ_TIMEOUT_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -93,7 +99,7 @@ public void testClientRequestIdForUnknownHostRetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new UnknownHostException();
|
||||
abbreviations[0] = UNKNOWN_HOST_EXCEPTION_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -102,7 +108,7 @@ public void testClientRequestIdForConnectionResetRetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new SocketTimeoutException(CONNECTION_RESET_MESSAGE + " by peer");
|
||||
abbreviations[0] = CONNECTION_RESET_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -111,7 +117,7 @@ public void testClientRequestIdForUnknownSocketExRetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new SocketException("unknown");
|
||||
abbreviations[0] = SOCKET_EXCEPTION_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -120,7 +126,7 @@ public void testClientRequestIdForIOERetry() throws Exception {
|
||||
String[] abbreviations = new String[1];
|
||||
exceptions[0] = new InterruptedIOException();
|
||||
abbreviations[0] = IO_EXCEPTION_ABBREVIATION;
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1);
|
||||
testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -158,16 +164,115 @@ public void testClientRequestIdFor503OtherRetry() throws Exception {
|
||||
testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for mocking the failure scenario with retry policy assertions.
|
||||
* Here we will try to create a request with following life cycle:
|
||||
* 1. Primary Request made fails with Connection Timeout and fall into retry loop
|
||||
* 2. Retried request fails with 503 and again go for retry
|
||||
* 3. Retried request fails with 503 and do not go for retry.
|
||||
*
|
||||
* We will try to assert that:
|
||||
* 1. Correct retry policy is used to get the retry interval for each failed request
|
||||
* 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used
|
||||
* @throws Exception
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
|
||||
|
||||
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
|
||||
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
|
||||
ExponentialRetryPolicy.class);
|
||||
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
|
||||
AbfsThrottlingIntercept intercept = Mockito.mock(
|
||||
AbfsThrottlingIntercept.class);
|
||||
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
|
||||
|
||||
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.ReadFile,
|
||||
abfsClient,
|
||||
"PUT",
|
||||
null,
|
||||
new ArrayList<>()
|
||||
));
|
||||
|
||||
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
|
||||
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
|
||||
|
||||
Stubber stubber = Mockito.doThrow(new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
|
||||
stubber.doNothing().when(httpOperation).processResponse(
|
||||
nullable(byte[].class), nullable(int.class), nullable(int.class));
|
||||
|
||||
when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_UNAVAILABLE);
|
||||
|
||||
TracingContext tracingContext = Mockito.mock(TracingContext.class);
|
||||
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
|
||||
Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
|
||||
Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
|
||||
Mockito.doReturn("HEAD").when(httpOperation).getMethod();
|
||||
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
|
||||
|
||||
try {
|
||||
// Operation will fail with CT first and then 503 thereafter.
|
||||
abfsRestOperation.execute(tracingContext);
|
||||
} catch(AbfsRestOperationException ex) {
|
||||
Assertions.assertThat(ex.getStatusCode())
|
||||
.describedAs("Status Code must be HTTP_UNAVAILABLE(409)")
|
||||
.isEqualTo(HTTP_UNAVAILABLE);
|
||||
}
|
||||
|
||||
// Assert that httpOperation.processResponse was called 3 times.
|
||||
// One for retry count 0
|
||||
// One for retry count 1 after failing with CT
|
||||
// One for retry count 2 after failing with 50
|
||||
Mockito.verify(httpOperation, times(3)).processResponse(
|
||||
nullable(byte[].class), nullable(int.class), nullable(int.class));
|
||||
|
||||
// Assert that Static Retry Policy was used after CT failure.
|
||||
// Iteration 1 failed with CT and shouldRetry was called with retry count 0
|
||||
// Before iteration 2 sleep will be computed using static retry policy and retry count 1
|
||||
Mockito.verify(abfsClient, Mockito.times(1))
|
||||
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
||||
Mockito.verify(staticRetryPolicy, Mockito.times(1))
|
||||
.shouldRetry(0, -1);
|
||||
Mockito.verify(staticRetryPolicy, Mockito.times(1))
|
||||
.getRetryInterval(1);
|
||||
Mockito.verify(tracingContext, Mockito.times(1))
|
||||
.constructHeader(httpOperation, CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION);
|
||||
|
||||
// Assert that exponential Retry Policy was used during second and third Iteration.
|
||||
// Iteration 2 and 3 failed with 503 and should retry was called with retry count 1 and 2
|
||||
// Before iteration 3 sleep will be computed using exponential retry policy and retry count 2
|
||||
// Should retry with retry count 2 will return false and no further requests will be made.
|
||||
Mockito.verify(abfsClient, Mockito.times(2))
|
||||
.getRetryPolicy("503");
|
||||
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
||||
.shouldRetry(1, HTTP_UNAVAILABLE);
|
||||
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
||||
.shouldRetry(2, HTTP_UNAVAILABLE);
|
||||
Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
|
||||
.getRetryInterval(2);
|
||||
Mockito.verify(tracingContext, Mockito.times(1))
|
||||
.constructHeader(httpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
|
||||
|
||||
// Assert that intercept.updateMetrics was called only once during second Iteration
|
||||
Mockito.verify(intercept, Mockito.times(2))
|
||||
.updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class));
|
||||
}
|
||||
|
||||
private void testClientRequestIdForStatusRetry(int status,
|
||||
String serverErrorMessage,
|
||||
String keyExpected) throws Exception {
|
||||
|
||||
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
|
||||
ExponentialRetryPolicy retryPolicy = Mockito.mock(
|
||||
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
|
||||
ExponentialRetryPolicy.class);
|
||||
addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
|
||||
|
||||
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
|
||||
AbfsThrottlingIntercept intercept = Mockito.mock(
|
||||
AbfsThrottlingIntercept.class);
|
||||
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
|
||||
|
||||
// Create a readfile operation that will fail
|
||||
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.ReadFile,
|
||||
abfsClient,
|
||||
@ -201,8 +306,7 @@ private void testClientRequestIdForStatusRetry(int status,
|
||||
|
||||
TracingContext tracingContext = Mockito.mock(TracingContext.class);
|
||||
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
|
||||
Mockito.doReturn(tracingContext)
|
||||
.when(abfsRestOperation).createNewTracingContext(any());
|
||||
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
|
||||
|
||||
int[] count = new int[1];
|
||||
count[0] = 0;
|
||||
@ -213,7 +317,7 @@ private void testClientRequestIdForStatusRetry(int status,
|
||||
}
|
||||
count[0]++;
|
||||
return null;
|
||||
}).when(tracingContext).constructHeader(any(), any());
|
||||
}).when(tracingContext).constructHeader(any(), any(), any());
|
||||
|
||||
abfsRestOperation.execute(tracingContext);
|
||||
Assertions.assertThat(count[0]).isEqualTo(2);
|
||||
@ -222,12 +326,14 @@ private void testClientRequestIdForStatusRetry(int status,
|
||||
|
||||
private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
|
||||
String[] abbreviationsExpected,
|
||||
int len) throws Exception {
|
||||
int len, int numOfCTExceptions) throws Exception {
|
||||
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
|
||||
ExponentialRetryPolicy retryPolicy = Mockito.mock(
|
||||
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
|
||||
ExponentialRetryPolicy.class);
|
||||
addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
|
||||
|
||||
StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class);
|
||||
AbfsThrottlingIntercept intercept = Mockito.mock(
|
||||
AbfsThrottlingIntercept.class);
|
||||
addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept);
|
||||
|
||||
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
|
||||
AbfsRestOperationType.ReadFile,
|
||||
@ -265,9 +371,19 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
|
||||
}
|
||||
count[0]++;
|
||||
return null;
|
||||
}).when(tracingContext).constructHeader(any(), any());
|
||||
}).when(tracingContext).constructHeader(any(), any(), any());
|
||||
|
||||
abfsRestOperation.execute(tracingContext);
|
||||
Assertions.assertThat(count[0]).isEqualTo(len + 1);
|
||||
|
||||
/**
|
||||
* Assert that getRetryPolicy was called with
|
||||
* failureReason CT only for Connection Timeout Cases.
|
||||
* For every failed request getRetryPolicy will be called three times
|
||||
* It will be called with failureReason CT for every request failing with CT
|
||||
*/
|
||||
Mockito.verify(abfsClient, Mockito.times(
|
||||
numOfCTExceptions))
|
||||
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
|
||||
}
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ public void testDefaultOAuthTokenFetchRetryPolicy() throws Exception {
|
||||
ExponentialRetryPolicy retryPolicy = abfsConfig
|
||||
.getOauthTokenFetchRetryPolicy();
|
||||
|
||||
Assertions.assertThat(retryPolicy.getRetryCount()).describedAs(
|
||||
Assertions.assertThat(retryPolicy.getMaxRetryCount()).describedAs(
|
||||
"retryCount should be the default value {} as the same "
|
||||
+ "is not configured",
|
||||
DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
|
||||
@ -103,7 +103,7 @@ public void testOAuthTokenFetchRetryPolicy()
|
||||
ExponentialRetryPolicy retryPolicy = abfsConfig
|
||||
.getOauthTokenFetchRetryPolicy();
|
||||
|
||||
Assertions.assertThat(retryPolicy.getRetryCount())
|
||||
Assertions.assertThat(retryPolicy.getMaxRetryCount())
|
||||
.describedAs("retryCount should be {}", TEST_RETRY_COUNT)
|
||||
.isEqualTo(TEST_RETRY_COUNT);
|
||||
Assertions.assertThat(retryPolicy.getMinBackoff())
|
||||
|
Loading…
Reference in New Issue
Block a user