From 1336c362e5b685c020b667e308f97f7bdb5b6fd8 Mon Sep 17 00:00:00 2001 From: Anuj Modi <128447756+anujmodi2021@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:31:42 -0800 Subject: [PATCH] Hadoop-18759: [ABFS][Backoff-Optimization] Have a Static retry policy for connection timeout. (#5881) Contributed By: Anuj Modi --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 32 ++++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 3 + .../azurebfs/constants/ConfigurationKeys.java | 13 ++ .../constants/FileSystemConfigurations.java | 21 ++- .../fs/azurebfs/services/AbfsClient.java | 27 +++- .../azurebfs/services/AbfsClientContext.java | 8 + .../services/AbfsClientContextBuilder.java | 12 +- .../azurebfs/services/AbfsHttpOperation.java | 14 +- .../azurebfs/services/AbfsRestOperation.java | 65 +++++--- .../fs/azurebfs/services/AbfsRetryPolicy.java | 98 +++++++++++ .../services/ExponentialRetryPolicy.java | 44 +---- .../services/RetryPolicyConstants.java | 35 ++++ .../azurebfs/services/StaticRetryPolicy.java | 52 ++++++ .../fs/azurebfs/utils/TracingContext.java | 14 +- .../azurebfs/ITestAzureBlobFileSystemE2E.java | 47 ++++++ .../ITestAzureBlobFileSystemListStatus.java | 2 +- ...TestAbfsConfigurationFieldsValidation.java | 94 +++++++---- .../fs/azurebfs/TestTracingContext.java | 77 ++++++++- .../MockDelegationSASTokenProvider.java | 6 +- .../azurebfs/services/AbfsClientTestUtil.java | 61 +++++-- .../fs/azurebfs/services/ITestAbfsClient.java | 8 +- .../services/ITestAbfsRestOperation.java | 5 +- ....java => ITestExponentialRetryPolicy.java} | 44 ++--- .../services/ITestStaticRetryPolicy.java | 142 ++++++++++++++++ .../services/TestAbfsPerfTracker.java | 29 ++-- .../services/TestAbfsRenameRetryRecovery.java | 11 +- .../TestAbfsRestOperationMockFailures.java | 152 +++++++++++++++--- .../services/TestAzureADAuthenticator.java | 4 +- 28 files changed, 935 insertions(+), 185 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java rename hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/{TestExponentialRetryPolicy.java => ITestExponentialRetryPolicy.java} (89%) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index eff8c08605..1216fe0696 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -152,6 +152,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL) private int maxBackoffInterval; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, + DefaultValue = DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED) + private boolean staticRetryForConnectionTimeoutEnabled; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_STATIC_RETRY_INTERVAL, + DefaultValue = DEFAULT_STATIC_RETRY_INTERVAL) + private int staticRetryInterval; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL, DefaultValue = DEFAULT_BACKOFF_INTERVAL) private int backoffInterval; @@ -166,6 +174,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT) private int customTokenFetchRetryCount; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT, + DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT) + private int httpConnectionTimeout; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT, + DefaultValue = DEFAULT_HTTP_READ_TIMEOUT) + private int httpReadTimeout; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT, MinValue = 0, DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS) @@ -669,6 +685,14 @@ public int getMaxBackoffIntervalMilliseconds() { return this.maxBackoffInterval; } + public boolean getStaticRetryForConnectionTimeoutEnabled() { + return staticRetryForConnectionTimeoutEnabled; + } + + public int getStaticRetryInterval() { + return staticRetryInterval; + } + public int getBackoffIntervalMilliseconds() { return this.backoffInterval; } @@ -681,6 +705,14 @@ public int getCustomTokenFetchRetryCount() { return this.customTokenFetchRetryCount; } + public int getHttpConnectionTimeout() { + return this.httpConnectionTimeout; + } + + public int getHttpReadTimeout() { + return this.httpReadTimeout; + } + public long getAzureBlockSize() { return this.azureBlockSize; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index d9693dd7e1..8ece527e56 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -118,6 +118,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; +import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.AbfsLease; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker; @@ -1781,6 +1782,8 @@ private AbfsClientContext populateAbfsClientContext() { return new AbfsClientContextBuilder() .withExponentialRetryPolicy( new ExponentialRetryPolicy(abfsConfiguration)) + .withStaticRetryPolicy( + new StaticRetryPolicy(abfsConfiguration)) .withAbfsCounters(abfsCounters) .withAbfsPerfTracker(abfsPerfTracker) .build(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index a27c757026..af60ce949f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -48,10 +48,23 @@ public final class ConfigurationKeys { // Retry strategy defined by the user public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval"; public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval"; + public static final String AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = "fs.azure.static.retry.for.connection.timeout.enabled"; + public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval"; public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count"; + /** + * Config to set HTTP Connection Timeout Value for Rest Operations. + * Value: {@value}. + */ + public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout"; + /** + * Config to set HTTP Read Timeout Value for Rest Operations. + * Value: {@value}. + */ + public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout"; + // Retry strategy for getToken calls public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries"; public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index b3825b4c53..331c9e5684 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -35,15 +35,28 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true; public static final String USER_HOME_DIRECTORY_PREFIX = "/user"; - private static final int SIXTY_SECONDS = 60 * 1000; + private static final int SIXTY_SECONDS = 60_000; // Retry parameter defaults. - public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 3s - public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s - public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s + public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3_000; // 3s + public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30_000; // 30s + public static final boolean DEFAULT_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED = true; + public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s + public static final int DEFAULT_BACKOFF_INTERVAL = 3_000; // 3s public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3; + /** + * Default value of connection timeout to be used while setting up HTTP Connection. + * Value: {@value}. + */ + public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 2_000; // 2s + /** + * Default value of read timeout to be used while setting up HTTP Connection. + * Value: {@value}. + */ + public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs + // Retry parameter defaults. public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5; public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 8eeb548f50..cb6f8e9ead 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -82,6 +82,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; /** * AbfsClient. @@ -93,7 +94,8 @@ public class AbfsClient implements Closeable { private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; private String xMsVersion = DECEMBER_2019_API_VERSION; - private final ExponentialRetryPolicy retryPolicy; + private final ExponentialRetryPolicy exponentialRetryPolicy; + private final StaticRetryPolicy staticRetryPolicy; private final String filesystem; private final AbfsConfiguration abfsConfiguration; private final String userAgent; @@ -131,7 +133,8 @@ private AbfsClient(final URL baseUrl, String baseUrlString = baseUrl.toString(); this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1); this.abfsConfiguration = abfsConfiguration; - this.retryPolicy = abfsClientContext.getExponentialRetryPolicy(); + this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy(); + this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy(); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); this.authType = abfsConfiguration.getAuthType(accountName); this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration); @@ -213,8 +216,24 @@ protected AbfsPerfTracker getAbfsPerfTracker() { return abfsPerfTracker; } - ExponentialRetryPolicy getRetryPolicy() { - return retryPolicy; + ExponentialRetryPolicy getExponentialRetryPolicy() { + return exponentialRetryPolicy; + } + + StaticRetryPolicy getStaticRetryPolicy() { + return staticRetryPolicy; + } + + /** + * Returns the retry policy to be used for Abfs Rest Operation Failure. + * @param failureReason helps to decide which type of retryPolicy to be used. + * @return retry policy to be used. + */ + public AbfsRetryPolicy getRetryPolicy(final String failureReason) { + return CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) + && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled() + ? getStaticRetryPolicy() + : getExponentialRetryPolicy(); } SharedKeyCredentials getSharedKeyCredentials() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java index ad20550af7..0a5182a699 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java @@ -25,14 +25,18 @@ public class AbfsClientContext { private final ExponentialRetryPolicy exponentialRetryPolicy; + private final StaticRetryPolicy staticRetryPolicy; private final AbfsPerfTracker abfsPerfTracker; private final AbfsCounters abfsCounters; AbfsClientContext( ExponentialRetryPolicy exponentialRetryPolicy, + StaticRetryPolicy staticRetryPolicy, AbfsPerfTracker abfsPerfTracker, AbfsCounters abfsCounters) { this.exponentialRetryPolicy = exponentialRetryPolicy; + + this.staticRetryPolicy = staticRetryPolicy; this.abfsPerfTracker = abfsPerfTracker; this.abfsCounters = abfsCounters; } @@ -41,6 +45,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() { return exponentialRetryPolicy; } + public StaticRetryPolicy getStaticRetryPolicy() { + return staticRetryPolicy; + } + public AbfsPerfTracker getAbfsPerfTracker() { return abfsPerfTracker; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java index 00513f7138..ca16de6d4f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java @@ -25,6 +25,7 @@ public class AbfsClientContextBuilder { private ExponentialRetryPolicy exponentialRetryPolicy; + private StaticRetryPolicy staticRetryPolicy; private AbfsPerfTracker abfsPerfTracker; private AbfsCounters abfsCounters; @@ -34,6 +35,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy( return this; } + public AbfsClientContextBuilder withStaticRetryPolicy( + final StaticRetryPolicy staticRetryPolicy) { + this.staticRetryPolicy = staticRetryPolicy; + return this; + } + public AbfsClientContextBuilder withAbfsPerfTracker( final AbfsPerfTracker abfsPerfTracker) { this.abfsPerfTracker = abfsPerfTracker; @@ -52,7 +59,10 @@ public AbfsClientContextBuilder withAbfsCounters(final AbfsCounters abfsCounters */ public AbfsClientContext build() { //validate the values - return new AbfsClientContext(exponentialRetryPolicy, abfsPerfTracker, + return new AbfsClientContext( + exponentialRetryPolicy, + staticRetryPolicy, + abfsPerfTracker, abfsCounters); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index c0b554f607..a29eed6f42 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -55,9 +55,6 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); - private static final int CONNECT_TIMEOUT = 30 * 1000; - private static final int READ_TIMEOUT = 30 * 1000; - private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024; private static final int ONE_THOUSAND = 1000; @@ -263,10 +260,12 @@ public String getMaskedEncodedUrl() { * @param url The full URL including query string parameters. * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). * @param requestHeaders The HTTP request headers.READ_TIMEOUT - * + * @param connectionTimeout The Connection Timeout value to be used while establishing http connection + * @param readTimeout The Read Timeout value to be used with http connection while making a request * @throws IOException if an error occurs. */ - public AbfsHttpOperation(final URL url, final String method, final List requestHeaders) + public AbfsHttpOperation(final URL url, final String method, final List requestHeaders, + final int connectionTimeout, final int readTimeout) throws IOException { this.url = url; this.method = method; @@ -280,9 +279,8 @@ public AbfsHttpOperation(final URL url, final String method, final List= HttpURLConnection.HTTP_INTERNAL_ERROR); - if (updateMetricsResponseCode) { + + /* + Connection Timeout failures should not contribute to throttling + In case the current request fails with Connection Timeout we will have + ioExceptionThrown true and failure reason as CT + In case the current request failed with 5xx, failure reason will be + updated after finally block but wasIOExceptionThrown will be false; + */ + boolean isCTFailure = CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown; + + if (updateMetricsResponseCode && !isCTFailure) { intercept.updateMetrics(operationType, httpOperation); } } LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); - if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { - int status = httpOperation.getStatusCode(); - failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); + int status = httpOperation.getStatusCode(); + failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); + retryPolicy = client.getRetryPolicy(failureReason); + + if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; } @@ -398,12 +419,16 @@ public void signRequest(final AbfsHttpOperation httpOperation, int bytesToSign) } /** - * Creates new object of {@link AbfsHttpOperation} with the url, method, and - * requestHeaders fields of the AbfsRestOperation object. + * Creates new object of {@link AbfsHttpOperation} with the url, method, requestHeader fields and + * timeout values as set in configuration of the AbfsRestOperation object. + * + * @return {@link AbfsHttpOperation} to be used for sending requests */ @VisibleForTesting AbfsHttpOperation createHttpOperation() throws IOException { - return new AbfsHttpOperation(url, method, requestHeaders); + return new AbfsHttpOperation(url, method, requestHeaders, + client.getAbfsConfiguration().getHttpConnectionTimeout(), + client.getAbfsConfiguration().getHttpReadTimeout()); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java new file mode 100644 index 0000000000..ffddd341ac --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.HttpURLConnection; + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; + +/** + * Abstract Class for Retry policy to be used by {@link AbfsClient} + * Implementation to be used is based on retry cause. + */ +public abstract class AbfsRetryPolicy { + + /** + * The maximum number of retry attempts. + */ + private final int maxRetryCount; + + /** + * Retry Policy Abbreviation for logging purpose. + */ + private final String retryPolicyAbbreviation; + + protected AbfsRetryPolicy(final int maxRetryCount, final String retryPolicyAbbreviation) { + this.maxRetryCount = maxRetryCount; + this.retryPolicyAbbreviation = retryPolicyAbbreviation; + } + + /** + * Returns if a request should be retried based on the retry count, current response, + * and the current strategy. The valid http status code lies in the range of 1xx-5xx. + * But an invalid status code might be set due to network or timeout kind of issues. + * Such invalid status code also qualify for retry. + * + * @param retryCount The current retry attempt count. + * @param statusCode The status code of the response, or -1 for socket error. + * @return true if the request should be retried; false otherwise. + */ + public boolean shouldRetry(final int retryCount, final int statusCode) { + return retryCount < maxRetryCount + && (statusCode < HTTP_CONTINUE + || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT + || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR + && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED + && statusCode != HttpURLConnection.HTTP_VERSION)); + } + + /** + * Returns backoff interval to be used for a particular retry count + * Child class should define how they want to calculate retry interval + * + * @param retryCount The current retry attempt count. + * @return backoff Interval time + */ + public abstract long getRetryInterval(int retryCount); + + /** + * Returns a String value of the abbreviation + * denoting which type of retry policy is used + * @return retry policy abbreviation + */ + public String getAbbreviation() { + return retryPolicyAbbreviation; + } + + /** + * Returns maximum number of retries allowed in this retry policy + * @return max retry count + */ + protected int getMaxRetryCount() { + return maxRetryCount; + } + + @Override + public String toString() { + return "AbfsRetryPolicy of subtype: " + + retryPolicyAbbreviation + + " and max retry count: " + + maxRetryCount; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 227bdc5fc1..f1f2bc8be3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -19,17 +19,14 @@ package org.apache.hadoop.fs.azurebfs.services; import java.util.Random; -import java.net.HttpURLConnection; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.classification.VisibleForTesting; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; - /** * Retry policy used by AbfsClient. * */ -public class ExponentialRetryPolicy { +public class ExponentialRetryPolicy extends AbfsRetryPolicy { /** * Represents the default amount of time used when calculating a random delta in the exponential * delay between retries. @@ -78,11 +75,6 @@ public class ExponentialRetryPolicy { */ private final int minBackoff; - /** - * The maximum number of retry attempts. - */ - private final int retryCount; - /** * Initializes a new instance of the {@link ExponentialRetryPolicy} class. */ @@ -105,38 +97,19 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) { /** * Initializes a new instance of the {@link ExponentialRetryPolicy} class. * - * @param retryCount The maximum number of retry attempts. + * @param maxRetryCount The maximum number of retry attempts. * @param minBackoff The minimum backoff time. * @param maxBackoff The maximum backoff time. * @param deltaBackoff The value that will be used to calculate a random delta in the exponential delay * between retries. */ - public ExponentialRetryPolicy(final int retryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) { - this.retryCount = retryCount; + public ExponentialRetryPolicy(final int maxRetryCount, final int minBackoff, final int maxBackoff, final int deltaBackoff) { + super(maxRetryCount, RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION); this.minBackoff = minBackoff; this.maxBackoff = maxBackoff; this.deltaBackoff = deltaBackoff; } - /** - * Returns if a request should be retried based on the retry count, current response, - * and the current strategy. The valid http status code lies in the range of 1xx-5xx. - * But an invalid status code might be set due to network or timeout kind of issues. - * Such invalid status code also qualify for retry. - * - * @param retryCount The current retry attempt count. - * @param statusCode The status code of the response, or -1 for socket error. - * @return true if the request should be retried; false otherwise. - */ - public boolean shouldRetry(final int retryCount, final int statusCode) { - return retryCount < this.retryCount - && (statusCode < HTTP_CONTINUE - || statusCode == HttpURLConnection.HTTP_CLIENT_TIMEOUT - || (statusCode >= HttpURLConnection.HTTP_INTERNAL_ERROR - && statusCode != HttpURLConnection.HTTP_NOT_IMPLEMENTED - && statusCode != HttpURLConnection.HTTP_VERSION)); - } - /** * Returns backoff interval between 80% and 120% of the desired backoff, * multiply by 2^n-1 for exponential. @@ -144,6 +117,7 @@ public boolean shouldRetry(final int retryCount, final int statusCode) { * @param retryCount The current retry attempt count. * @return backoff Interval time */ + @Override public long getRetryInterval(final int retryCount) { final long boundedRandDelta = (int) (this.deltaBackoff * MIN_RANDOM_RATIO) + this.randRef.nextInt((int) (this.deltaBackoff * MAX_RANDOM_RATIO) @@ -151,16 +125,12 @@ public long getRetryInterval(final int retryCount) { final double incrementDelta = (Math.pow(2, retryCount - 1)) * boundedRandDelta; - final long retryInterval = (int) Math.round(Math.min(this.minBackoff + incrementDelta, maxBackoff)); + final long retryInterval = (int) Math.round(Math.min( + this.minBackoff + incrementDelta, maxBackoff)); return retryInterval; } - @VisibleForTesting - int getRetryCount() { - return this.retryCount; - } - @VisibleForTesting int getMinBackoff() { return this.minBackoff; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java new file mode 100644 index 0000000000..d2df032e71 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryPolicyConstants.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +public final class RetryPolicyConstants { + + private RetryPolicyConstants() { + + } + + /** + * Constant for Exponential Retry Policy Abbreviation. {@value} + */ + public static final String EXPONENTIAL_RETRY_POLICY_ABBREVIATION= "E"; + /** + * Constant for Static Retry Policy Abbreviation. {@value} + */ + public static final String STATIC_RETRY_POLICY_ABBREVIATION = "S"; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java new file mode 100644 index 0000000000..7f6eb5f4dd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/StaticRetryPolicy.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; + +/** + * Retry policy used by AbfsClient for Network Errors. + * */ +public class StaticRetryPolicy extends AbfsRetryPolicy { + + /** + * Represents the constant retry interval to be used with Static Retry Policy + */ + private final int retryInterval; + + /** + * Initializes a new instance of the {@link StaticRetryPolicy} class. + * @param conf The {@link AbfsConfiguration} from which to retrieve retry configuration. + */ + public StaticRetryPolicy(AbfsConfiguration conf) { + super(conf.getMaxIoRetries(), RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION); + this.retryInterval = conf.getStaticRetryInterval(); + } + + /** + * Returns a constant backoff interval independent of retry count; + * + * @param retryCount The current retry attempt count. + * @return backoff Interval time + */ + @Override + public long getRetryInterval(final int retryCount) { + return retryInterval; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 97864e61e0..3c54c204dd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; /** * The TracingContext class to correlate Store requests using unique @@ -66,7 +67,7 @@ public class TracingContext { /** * If {@link #primaryRequestId} is null, this field shall be set equal * to the last part of the {@link #clientRequestId}'s UUID - * in {@link #constructHeader(AbfsHttpOperation, String)} only on the + * in {@link #constructHeader(AbfsHttpOperation, String, String)} only on the * first API call for an operation. Subsequent retries for that operation * will not change this field. In case {@link #primaryRequestId} is non-null, * this field shall not be set. @@ -168,8 +169,10 @@ public void setListener(Listener listener) { * connection * @param previousFailure Failure seen before this API trigger on same operation * from AbfsClient. + * @param retryPolicyAbbreviation Retry policy used to get retry interval before this + * API trigger on same operation from AbfsClient */ - public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure) { + public void constructHeader(AbfsHttpOperation httpOperation, String previousFailure, String retryPolicyAbbreviation) { clientRequestId = UUID.randomUUID().toString(); switch (format) { case ALL_ID_FORMAT: // Optional IDs (e.g. streamId) may be empty @@ -177,7 +180,7 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail clientCorrelationID + ":" + clientRequestId + ":" + fileSystemID + ":" + getPrimaryRequestIdForHeader(retryCount > 0) + ":" + streamID + ":" + opType + ":" + retryCount; - header = addFailureReasons(header, previousFailure); + header = addFailureReasons(header, previousFailure, retryPolicyAbbreviation); break; case TWO_ID_FORMAT: header = clientCorrelationID + ":" + clientRequestId; @@ -217,10 +220,13 @@ private String getPrimaryRequestIdForHeader(final Boolean isRetry) { } private String addFailureReasons(final String header, - final String previousFailure) { + final String previousFailure, String retryPolicyAbbreviation) { if (previousFailure == null) { return header; } + if (CONNECTION_TIMEOUT_ABBREVIATION.equals(previousFailure) && retryPolicyAbbreviation != null) { + return String.format("%s_%s_%s", header, previousFailure, retryPolicyAbbreviation); + } return String.format("%s_%s", header, previousFailure); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 56016a3947..f1673a3b38 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -23,6 +23,9 @@ import java.util.Arrays; import java.util.Random; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -32,6 +35,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_READ_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists; @@ -45,6 +51,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { private static final int TEST_OFFSET = 100; private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900; + private static final int TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS = 500; + private static final int TEST_STABLE_DEFAULT_READ_TIMEOUT_MS = 30000; + private static final int TEST_UNSTABLE_READ_TIMEOUT_MS = 1; public ITestAzureBlobFileSystemE2E() throws Exception { super(); @@ -229,4 +238,42 @@ private void testWriteOneByteToFile(Path testFilePath) throws Exception { FileStatus fileStatus = fs.getFileStatus(testFilePath); assertEquals(1, fileStatus.getLen()); } + + @Test + public void testHttpConnectionTimeout() throws Exception { + // Not seeing connection failures while testing with 1 ms connection + // timeout itself and on repeated TPCDS runs when cluster + // and account are in same region, 10 ms is seen stable. + // 500 ms is seen stable for cross region. + testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS, + TEST_STABLE_DEFAULT_READ_TIMEOUT_MS); + } + + @Test(expected = InvalidAbfsRestOperationException.class) + public void testHttpReadTimeout() throws Exception { + // Small read timeout is bound to make the request fail. + testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS, + TEST_UNSTABLE_READ_TIMEOUT_MS); + } + + public void testHttpTimeouts(int connectionTimeoutMs, int readTimeoutMs) + throws Exception { + Configuration conf = this.getRawConfiguration(); + // set to small values that will cause timeouts + conf.setInt(AZURE_HTTP_CONNECTION_TIMEOUT, connectionTimeoutMs); + conf.setInt(AZURE_HTTP_READ_TIMEOUT, readTimeoutMs); + // Reduce retry count to reduce test run time + conf.setInt(AZURE_MAX_IO_RETRIES, 1); + final AzureBlobFileSystem fs = getFileSystem(conf); + Assertions.assertThat( + fs.getAbfsStore().getAbfsConfiguration().getHttpConnectionTimeout()) + .describedAs("HTTP connection time should be picked from config") + .isEqualTo(connectionTimeoutMs); + Assertions.assertThat( + fs.getAbfsStore().getAbfsConfiguration().getHttpReadTimeout()) + .describedAs("HTTP Read time should be picked from config") + .isEqualTo(readTimeoutMs); + Path testPath = path(methodName.getMethodName()); + ContractTestUtils.createFile(fs, testPath, false, new byte[0]); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index e7f57b8af5..b374193e9b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -178,7 +178,7 @@ public void testListPathTracingContext() throws Exception { TEST_CONTINUATION_TOKEN, spiedTracingContext); // Assert that none of the API calls used the same tracing header. - Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any()); + Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any()); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java index f041f4bccd..0b7645bd24 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsConfigurationFieldsValidation.java @@ -34,21 +34,11 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import org.assertj.core.api.Assertions; import org.junit.Test; /** @@ -118,15 +108,15 @@ public void testValidateFunctionsInConfigServiceImpl() throws Exception { for (Field field : fields) { field.setAccessible(true); if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { - assertEquals(TEST_INT, abfsConfiguration.validateInt(field)); + Assertions.assertThat(abfsConfiguration.validateInt(field)).isEqualTo(TEST_INT); } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { - assertEquals(DEFAULT_LONG, abfsConfiguration.validateLong(field)); + Assertions.assertThat(abfsConfiguration.validateLong(field)).isEqualTo(DEFAULT_LONG); } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { - assertEquals("stringValue", abfsConfiguration.validateString(field)); + Assertions.assertThat(abfsConfiguration.validateString(field)).isEqualTo("stringValue"); } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { - assertEquals(this.encodedString, abfsConfiguration.validateBase64String(field)); + Assertions.assertThat(abfsConfiguration.validateBase64String(field)).isEqualTo(this.encodedString); } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { - assertEquals(true, abfsConfiguration.validateBoolean(field)); + Assertions.assertThat(abfsConfiguration.validateBoolean(field)).isEqualTo(true); } } } @@ -134,27 +124,54 @@ public void testValidateFunctionsInConfigServiceImpl() throws Exception { @Test public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception { // test that all the ConfigurationServiceImpl annotated fields have been initialized in the constructor - assertEquals(DEFAULT_WRITE_BUFFER_SIZE, abfsConfiguration.getWriteBufferSize()); - assertEquals(DEFAULT_READ_BUFFER_SIZE, abfsConfiguration.getReadBufferSize()); - assertEquals(DEFAULT_MIN_BACKOFF_INTERVAL, abfsConfiguration.getMinBackoffIntervalMilliseconds()); - assertEquals(DEFAULT_MAX_BACKOFF_INTERVAL, abfsConfiguration.getMaxBackoffIntervalMilliseconds()); - assertEquals(DEFAULT_BACKOFF_INTERVAL, abfsConfiguration.getBackoffIntervalMilliseconds()); - assertEquals(DEFAULT_MAX_RETRY_ATTEMPTS, abfsConfiguration.getMaxIoRetries()); - assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize()); - assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost()); - assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange()); + Assertions.assertThat(abfsConfiguration.getWriteBufferSize()) + .describedAs("Default value of write buffer size should be initialized") + .isEqualTo(DEFAULT_WRITE_BUFFER_SIZE); + Assertions.assertThat(abfsConfiguration.getReadBufferSize()) + .describedAs("Default value of read buffer size should be initialized") + .isEqualTo(DEFAULT_READ_BUFFER_SIZE); + Assertions.assertThat(abfsConfiguration.getMinBackoffIntervalMilliseconds()) + .describedAs("Default value of min backoff interval should be initialized") + .isEqualTo(DEFAULT_MIN_BACKOFF_INTERVAL); + Assertions.assertThat(abfsConfiguration.getMaxBackoffIntervalMilliseconds()) + .describedAs("Default value of max backoff interval should be initialized") + .isEqualTo(DEFAULT_MAX_BACKOFF_INTERVAL); + Assertions.assertThat(abfsConfiguration.getBackoffIntervalMilliseconds()) + .describedAs("Default value of backoff interval should be initialized") + .isEqualTo(DEFAULT_BACKOFF_INTERVAL); + Assertions.assertThat(abfsConfiguration.getMaxIoRetries()) + .describedAs("Default value of max number of retries should be initialized") + .isEqualTo(DEFAULT_MAX_RETRY_ATTEMPTS); + Assertions.assertThat(abfsConfiguration.getAzureBlockSize()) + .describedAs("Default value of azure block size should be initialized") + .isEqualTo(MAX_AZURE_BLOCK_SIZE); + Assertions.assertThat(abfsConfiguration.getAzureBlockLocationHost()) + .describedAs("Default value of azure block location host should be initialized") + .isEqualTo(AZURE_BLOCK_LOCATION_HOST_DEFAULT); + Assertions.assertThat(abfsConfiguration.getReadAheadRange()) + .describedAs("Default value of read ahead range should be initialized") + .isEqualTo(DEFAULT_READ_AHEAD_RANGE); + Assertions.assertThat(abfsConfiguration.getHttpConnectionTimeout()) + .describedAs("Default value of http connection timeout should be initialized") + .isEqualTo(DEFAULT_HTTP_CONNECTION_TIMEOUT); + Assertions.assertThat(abfsConfiguration.getHttpReadTimeout()) + .describedAs("Default value of http read timeout should be initialized") + .isEqualTo(DEFAULT_HTTP_READ_TIMEOUT); } @Test public void testConfigBlockSizeInitialized() throws Exception { // test the block size annotated field has been initialized in the constructor - assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize()); + Assertions.assertThat(abfsConfiguration.getAzureBlockSize()) + .describedAs("Default value of max azure block size should be initialized") + .isEqualTo(MAX_AZURE_BLOCK_SIZE); } @Test public void testGetAccountKey() throws Exception { String accountKey = abfsConfiguration.getStorageAccountKey(); - assertEquals(this.encodedAccountKey, accountKey); + Assertions.assertThat(accountKey).describedAs("Account Key should be initialized in configs") + .isEqualTo(this.encodedAccountKey); } @Test(expected = KeyProviderException.class) @@ -169,19 +186,28 @@ public void testGetAccountKeyWithNonExistingAccountName() throws Exception { @Test public void testSSLSocketFactoryConfiguration() throws InvalidConfigurationValueException, IllegalAccessException, IOException { - assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default, abfsConfiguration.getPreferredSSLFactoryOption()); - assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, abfsConfiguration.getPreferredSSLFactoryOption()); - assertNotEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, abfsConfiguration.getPreferredSSLFactoryOption()); - + Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption()) + .describedAs("By default SSL Channel Mode should be Default") + .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default); + Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption()) + .describedAs("By default SSL Channel Mode should be Default") + .isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE); + Assertions.assertThat(abfsConfiguration.getPreferredSSLFactoryOption()) + .describedAs("By default SSL Channel Mode should be Default") + .isNotEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL); Configuration configuration = new Configuration(); configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE); AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration, accountName); - assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE, localAbfsConfiguration.getPreferredSSLFactoryOption()); + Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption()) + .describedAs("SSL Channel Mode should be Default_JSSE as set") + .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.Default_JSSE); configuration = new Configuration(); configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL); localAbfsConfiguration = new AbfsConfiguration(configuration, accountName); - assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption()); + Assertions.assertThat(localAbfsConfiguration.getPreferredSSLFactoryOption()) + .describedAs("SSL Channel Mode should be OpenSSL as set") + .isEqualTo(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL); } public static AbfsConfiguration updateRetryConfigs(AbfsConfiguration abfsConfig, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index 0a19a24de3..3ffa2bd49e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -51,6 +51,10 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION; public class TestTracingContext extends AbstractAbfsIntegrationTest { private static final String[] CLIENT_CORRELATIONID_LIST = { @@ -213,7 +217,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin 0)); AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class); Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString()); - tracingContext.constructHeader(abfsHttpOperation, null); + tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); String header = tracingContext.getHeader(); String clientRequestIdUsed = header.split(":")[1]; String[] clientRequestIdUsedParts = clientRequestIdUsed.split("-"); @@ -225,7 +229,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false, 1)); - tracingContext.constructHeader(abfsHttpOperation, "RT"); + tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); header = tracingContext.getHeader(); String primaryRequestId = header.split(":")[3]; @@ -250,7 +254,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin tracingContext.setPrimaryRequestID(); AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class); Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString()); - tracingContext.constructHeader(abfsHttpOperation, null); + tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); String header = tracingContext.getHeader(); String assertionPrimaryId = header.split(":")[3]; @@ -260,7 +264,7 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false, 1)); - tracingContext.constructHeader(abfsHttpOperation, "RT"); + tracingContext.constructHeader(abfsHttpOperation, READ_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); header = tracingContext.getHeader(); String primaryRequestId = header.split(":")[3]; @@ -269,4 +273,69 @@ fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new Tracin + "should be equal to PrimaryRequestId in the original request.") .isEqualTo(assertionPrimaryId); } + + @Test + public void testTracingContextHeaderForRetrypolicy() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final String fileSystemId = fs.getFileSystemId(); + final String clientCorrelationId = fs.getClientCorrelationId(); + final TracingHeaderFormat tracingHeaderFormat = TracingHeaderFormat.ALL_ID_FORMAT; + TracingContext tracingContext = new TracingContext(clientCorrelationId, + fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, new TracingHeaderValidator( + fs.getAbfsStore().getAbfsConfiguration().getClientCorrelationId(), + fs.getFileSystemId(), FSOperationType.CREATE_FILESYSTEM, false, + 0)); + tracingContext.setPrimaryRequestID(); + AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class); + Mockito.doNothing().when(abfsHttpOperation).setRequestProperty(Mockito.anyString(), Mockito.anyString()); + + tracingContext.constructHeader(abfsHttpOperation, null, null); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null); + + tracingContext.constructHeader(abfsHttpOperation, null, STATIC_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null); + + tracingContext.constructHeader(abfsHttpOperation, null, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), null, null); + + tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, null); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, null); + + tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION); + + tracingContext.constructHeader(abfsHttpOperation, CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), CONNECTION_TIMEOUT_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + + tracingContext.constructHeader(abfsHttpOperation, "503", null); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null); + + tracingContext.constructHeader(abfsHttpOperation, "503", STATIC_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null); + + tracingContext.constructHeader(abfsHttpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + checkHeaderForRetryPolicyAbbreviation(tracingContext.getHeader(), "503", null); + } + + private void checkHeaderForRetryPolicyAbbreviation(String header, String expectedFailureReason, String expectedRetryPolicyAbbreviation) { + String[] headerContents = header.split(":"); + String previousReqContext = headerContents[6]; + + if (expectedFailureReason != null) { + Assertions.assertThat(previousReqContext.split("_")[1]).describedAs( + "Failure reason Is not as expected").isEqualTo(expectedFailureReason); + if (expectedRetryPolicyAbbreviation != null) { + Assertions.assertThat(previousReqContext.split("_")).describedAs( + "Retry Count, Failure Reason and Retry Policy should be present").hasSize(3); + Assertions.assertThat(previousReqContext.split("_")[2]).describedAs( + "Retry policy is not as expected").isEqualTo(expectedRetryPolicyAbbreviation); + } else { + Assertions.assertThat(previousReqContext.split("_")).describedAs( + "Retry Count and Failure Reason should be present").hasSize(2); + } + } else { + Assertions.assertThat(previousReqContext.split("_")).describedAs( + "Only Retry Count should be present").hasSize(1); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java index cf7d51da4c..00c681fdad 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java @@ -39,6 +39,9 @@ import org.apache.hadoop.fs.azurebfs.utils.SASGenerator; import org.apache.hadoop.security.AccessControlException; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT; + /** * A mock SAS token provider implementation */ @@ -103,7 +106,8 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app requestBody.append(ske); requestBody.append(""); - AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders); + AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders, + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); op.sendRequest(requestBuffer, 0, requestBuffer.length); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index 875682fe20..b1b093d670 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -26,7 +26,9 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.assertj.core.api.Assertions; +import org.mockito.AdditionalMatchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -35,8 +37,12 @@ import org.apache.hadoop.util.functional.FunctionRaisingIOE; import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; @@ -46,6 +52,7 @@ * objects which are protected inside services package. */ public final class AbfsClientTestUtil { + private static final long ONE_SEC = 1000; private AbfsClientTestUtil() { @@ -55,7 +62,9 @@ public static void setMockAbfsRestOperationForListPathOperation( final AbfsClient spiedClient, FunctionRaisingIOE functionRaisingIOE) throws Exception { - ExponentialRetryPolicy retryPolicy = Mockito.mock(ExponentialRetryPolicy.class); + ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(ExponentialRetryPolicy.class); + StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); + AbfsThrottlingIntercept intercept = Mockito.mock(AbfsThrottlingIntercept.class); AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.ListPaths, @@ -68,7 +77,7 @@ public static void setMockAbfsRestOperationForListPathOperation( Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation( eq(AbfsRestOperationType.ListPaths), any(), any(), any()); - addGeneralMockBehaviourToAbfsClient(spiedClient, retryPolicy); + addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, staticRetryPolicy, intercept); addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); functionRaisingIOE.apply(httpOperation); @@ -96,28 +105,54 @@ public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperat * Adding general mock behaviour to AbfsClient to avoid any NPE occurring. * These will avoid any network call made and will return the relevant exception or return value directly. * @param abfsClient to be mocked - * @param retryPolicy to be mocked + * @param exponentialRetryPolicy + * @param staticRetryPolicy * @throws IOException */ public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient, - final ExponentialRetryPolicy retryPolicy) throws IOException { + final ExponentialRetryPolicy exponentialRetryPolicy, + final StaticRetryPolicy staticRetryPolicy, + final AbfsThrottlingIntercept intercept) throws IOException { Mockito.doReturn(OAuth).when(abfsClient).getAuthType(); Mockito.doReturn("").when(abfsClient).getAccessToken(); - AbfsThrottlingIntercept intercept = Mockito.mock( - AbfsThrottlingIntercept.class); + Mockito.doReturn(intercept).when(abfsClient).getIntercept(); Mockito.doNothing() .when(intercept) .sendingRequest(any(), nullable(AbfsCounters.class)); Mockito.doNothing().when(intercept).updateMetrics(any(), any()); - Mockito.doReturn(retryPolicy).when(abfsClient).getRetryPolicy(); - Mockito.doReturn(true) - .when(retryPolicy) - .shouldRetry(nullable(Integer.class), nullable(Integer.class)); - Mockito.doReturn(false).when(retryPolicy).shouldRetry(0, HTTP_OK); - Mockito.doReturn(false).when(retryPolicy).shouldRetry(1, HTTP_OK); - Mockito.doReturn(false).when(retryPolicy).shouldRetry(2, HTTP_OK); + // Returning correct retry policy based on failure reason + Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getExponentialRetryPolicy(); + Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); + Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy( + AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION))); + + // Defining behavior of static retry policy + Mockito.doReturn(true).when(staticRetryPolicy) + .shouldRetry(nullable(Integer.class), nullable(Integer.class)); + Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(1, HTTP_OK); + Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, HTTP_OK); + Mockito.doReturn(true).when(staticRetryPolicy).shouldRetry(1, HTTP_UNAVAILABLE); + // We want only two retries to occcur + Mockito.doReturn(false).when(staticRetryPolicy).shouldRetry(2, HTTP_UNAVAILABLE); + Mockito.doReturn(STATIC_RETRY_POLICY_ABBREVIATION).when(staticRetryPolicy).getAbbreviation(); + Mockito.doReturn(ONE_SEC).when(staticRetryPolicy).getRetryInterval(nullable(Integer.class)); + + // Defining behavior of exponential retry policy + Mockito.doReturn(true).when(exponentialRetryPolicy) + .shouldRetry(nullable(Integer.class), nullable(Integer.class)); + Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(1, HTTP_OK); + Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, HTTP_OK); + Mockito.doReturn(true).when(exponentialRetryPolicy).shouldRetry(1, HTTP_UNAVAILABLE); + // We want only two retries to occcur + Mockito.doReturn(false).when(exponentialRetryPolicy).shouldRetry(2, HTTP_UNAVAILABLE); + Mockito.doReturn(EXPONENTIAL_RETRY_POLICY_ABBREVIATION).when(exponentialRetryPolicy).getAbbreviation(); + Mockito.doReturn(2 * ONE_SEC).when(exponentialRetryPolicy).getRetryInterval(nullable(Integer.class)); + + AbfsConfiguration configurations = Mockito.mock(AbfsConfiguration.class); + Mockito.doReturn(configurations).when(abfsClient).getAbfsConfiguration(); + Mockito.doReturn(true).when(configurations).getStaticRetryForConnectionTimeoutEnabled(); } public static void hookOnRestOpsForTracingContextSingularity(AbfsClient client) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 08af6375a1..b031271a51 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -52,6 +52,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; @@ -77,7 +78,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_NAME; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLUSTER_TYPE; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_VALUE_UNKNOWN; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; /** @@ -365,7 +365,9 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.getAuthType()).thenReturn(currentAuthType); - when(client.getRetryPolicy()).thenReturn( + when(client.getExponentialRetryPolicy()).thenReturn( + new ExponentialRetryPolicy(1)); + when(client.getRetryPolicy(any())).thenReturn( new ExponentialRetryPolicy(1)); when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); @@ -560,7 +562,7 @@ public void testExpectHundredContinue() throws Exception { appendRequestParameters.getLength(), null)); AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, - HTTP_METHOD_PUT, requestHeaders)); + HTTP_METHOD_PUT, requestHeaders, DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT)); // Sets the expect request property if expect header is enabled. if (appendRequestParameters.isExpectHeaderEnabled()) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 16a47d15f5..32897355f1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -53,6 +53,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; @@ -202,7 +204,8 @@ private AbfsRestOperation getRestOperation() throws Exception { appendRequestParameters.getoffset(), appendRequestParameters.getLength(), null)); - AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders)); + AbfsHttpOperation abfsHttpOperation = Mockito.spy(new AbfsHttpOperation(url, HTTP_METHOD_PUT, requestHeaders, + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT)); // Sets the expect request property if expect header is enabled. if (expectHeaderEnabled) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java similarity index 89% rename from hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java rename to hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java index 12ab4e9ead..13323eb2a2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java @@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.junit.Assert; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -56,9 +55,9 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; /** - * Unit test TestExponentialRetryPolicy. + * Unit test ITestExponentialRetryPolicy. */ -public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { +public class ITestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { private final int maxRetryCount = 30; private final int noRetryCount = 0; private final int retryCount = new Random().nextInt(maxRetryCount); @@ -68,7 +67,7 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { private static final int ANALYSIS_PERIOD = 10000; - public TestExponentialRetryPolicy() throws Exception { + public ITestExponentialRetryPolicy() throws Exception { super(); } @@ -86,9 +85,10 @@ public void testDifferentMaxIORetryCount() throws Exception { @Test public void testDefaultMaxIORetryCount() throws Exception { AbfsConfiguration abfsConfig = getAbfsConfig(); - Assert.assertEquals( - String.format("default maxIORetry count is %s.", maxRetryCount), - maxRetryCount, abfsConfig.getMaxIoRetries()); + Assertions.assertThat(abfsConfig.getMaxIoRetries()) + .describedAs("Max retry count should be %s", maxRetryCount) + .isEqualTo(maxRetryCount); + testMaxIOConfig(abfsConfig); } @@ -265,7 +265,7 @@ public void testAbfsConfigConstructor() throws Exception { ExponentialRetryPolicy template = new ExponentialRetryPolicy( getAbfsConfig().getMaxIoRetries()); int testModifier = 1; - int expectedMaxRetries = template.getRetryCount() + testModifier; + int expectedMaxRetries = template.getMaxRetryCount() + testModifier; int expectedMinBackoff = template.getMinBackoff() + testModifier; int expectedMaxBackoff = template.getMaxBackoff() + testModifier; int expectedDeltaBackoff = template.getDeltaBackoff() + testModifier; @@ -279,10 +279,18 @@ public void testAbfsConfigConstructor() throws Exception { ExponentialRetryPolicy policy = new ExponentialRetryPolicy( new AbfsConfiguration(config, "dummyAccountName")); - Assert.assertEquals("Max retry count was not set as expected.", expectedMaxRetries, policy.getRetryCount()); - Assert.assertEquals("Min backoff interval was not set as expected.", expectedMinBackoff, policy.getMinBackoff()); - Assert.assertEquals("Max backoff interval was not set as expected.", expectedMaxBackoff, policy.getMaxBackoff()); - Assert.assertEquals("Delta backoff interval was not set as expected.", expectedDeltaBackoff, policy.getDeltaBackoff()); + Assertions.assertThat(policy.getMaxRetryCount()) + .describedAs("Max retry count was not set as expected.") + .isEqualTo(expectedMaxRetries); + Assertions.assertThat(policy.getMinBackoff()) + .describedAs("Min backoff interval was not set as expected.") + .isEqualTo(expectedMinBackoff); + Assertions.assertThat(policy.getMaxBackoff()) + .describedAs("Max backoff interval was not set as expected") + .isEqualTo(expectedMaxBackoff); + Assertions.assertThat(policy.getDeltaBackoff()) + .describedAs("Delta backoff interval was not set as expected.") + .isEqualTo(expectedDeltaBackoff); } private AbfsConfiguration getAbfsConfig() throws Exception { @@ -297,14 +305,14 @@ private void testMaxIOConfig(AbfsConfiguration abfsConfig) { int localRetryCount = 0; while (localRetryCount < abfsConfig.getMaxIoRetries()) { - Assert.assertTrue( - "Retry should be allowed when retryCount less than max count configured.", - retryPolicy.shouldRetry(localRetryCount, -1)); + Assertions.assertThat(retryPolicy.shouldRetry(localRetryCount, -1)) + .describedAs("Retry should be allowed when retryCount less than max count configured.") + .isTrue(); localRetryCount++; } - Assert.assertEquals( - "When all retries are exhausted, the retryCount will be same as max configured", - abfsConfig.getMaxIoRetries(), localRetryCount); + Assertions.assertThat(localRetryCount) + .describedAs("When all retries are exhausted, the retryCount will be same as max configured.") + .isEqualTo(abfsConfig.getMaxIoRetries()); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java new file mode 100644 index 0000000000..9b4467c1db --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestStaticRetryPolicy.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_STATIC_RETRY_INTERVAL; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; + +/** + * Class to test the behavior of Static Retry policy as well the inheritance + * between {@link AbfsRetryPolicy}, {@link ExponentialRetryPolicy}, {@link StaticRetryPolicy} + */ +public class ITestStaticRetryPolicy extends AbstractAbfsIntegrationTest { + + public ITestStaticRetryPolicy() throws Exception { + super(); + } + + /** + * Tests for retry policy related configurations. + * Asserting that the correct retry policy is used for a given set of + * configurations including default ones + * @throws Exception + */ + @Test + public void testStaticRetryPolicyInitializationDefault() throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + assertInitialization(config, StaticRetryPolicy.class); + } + + @Test + public void testStaticRetryPolicyInitialization1() throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true"); + assertInitialization(config, StaticRetryPolicy.class); + } + + @Test + public void testStaticRetryPolicyInitialization2() throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "false"); + assertInitialization(config, ExponentialRetryPolicy.class); + } + + private void assertInitialization(Configuration config, Class retryPolicyClass) throws Exception{ + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem + .newInstance(getFileSystem().getUri(), config); + AbfsClient client = fs.getAbfsStore().getClient(); + + // Assert that static retry policy will be used only for CT Failures + AbfsRetryPolicy retryPolicy = client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); + Assertions.assertThat(retryPolicy) + .describedAs("RetryPolicy Type is Not As Expected") + .isInstanceOf(retryPolicyClass); + + // For all other possible values of failureReason, Exponential retry is used + retryPolicy = client.getRetryPolicy(""); + assertIsExponentialRetryPolicy(retryPolicy); + retryPolicy = client.getRetryPolicy(null); + assertIsExponentialRetryPolicy(retryPolicy); + retryPolicy = client.getRetryPolicy(CONNECTION_RESET_ABBREVIATION); + assertIsExponentialRetryPolicy(retryPolicy); + } + + /** + * Test to assert that static retry policy returns the same retry interval + * independent of retry count + * @throws Exception + */ + @Test + public void testStaticRetryInterval() throws Exception { + Configuration config = new Configuration(this.getRawConfiguration()); + long retryInterval = 1000; + int maxIoRetry = 5; + config.set(AZURE_STATIC_RETRY_FOR_CONNECTION_TIMEOUT_ENABLED, "true"); + config.set(AZURE_STATIC_RETRY_INTERVAL, "1000"); + config.set(AZURE_MAX_IO_RETRIES, "5"); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem + .newInstance(getFileSystem().getUri(), config); + AbfsClient client = fs.getAbfsStore().getClient(); + + AbfsRetryPolicy retryPolicy = client.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); + assertIsStaticRetryPolicy(retryPolicy); + + Assertions.assertThat(retryPolicy.shouldRetry(0, -1)) + .describedAs("Should retry should be true") + .isEqualTo(true); + Assertions.assertThat(retryPolicy.getRetryInterval(0)) + .describedAs("Retry Interval Value Not as expected") + .isEqualTo(retryInterval); + Assertions.assertThat(retryPolicy.getRetryInterval(1)) + .describedAs("Retry Interval Value Not as expected") + .isEqualTo(retryInterval); + Assertions.assertThat(retryPolicy.getRetryInterval(2)) + .describedAs("Retry Interval Value Not as expected") + .isEqualTo(retryInterval); + Assertions.assertThat(retryPolicy.getRetryInterval(3)) + .describedAs("Retry Interval Value Not as expected") + .isEqualTo(retryInterval); + Assertions.assertThat(retryPolicy.shouldRetry(maxIoRetry, -1)) + .describedAs("Should retry for maxretrycount should be false") + .isEqualTo(false); + } + + private void assertIsExponentialRetryPolicy(AbfsRetryPolicy retryPolicy) { + Assertions.assertThat(retryPolicy) + .describedAs("Exponential Retry policy must be used") + .isInstanceOf(ExponentialRetryPolicy.class); + } + + private void assertIsStaticRetryPolicy(AbfsRetryPolicy retryPolicy) { + Assertions.assertThat(retryPolicy) + .describedAs("Static Retry policy must be used") + .isInstanceOf(StaticRetryPolicy.class); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java index ef52f244f7..b7fb892362 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT; import static org.assertj.core.api.Assertions.assertThat; /** @@ -74,7 +76,8 @@ public void verifyDisablingOfTracker() throws Exception { try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", "disablingCallee")) { - AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); tracker.registerResult(op).registerSuccess(true); } @@ -92,7 +95,8 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -131,7 +135,8 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -170,7 +175,8 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -205,7 +211,8 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -269,7 +276,8 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -303,7 +311,8 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -363,7 +372,8 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { Instant testInstant = Instant.now(); AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); verifyNoException(abfsPerfTrackerDisabled); verifyNoException(abfsPerfTrackerEnabled); @@ -371,7 +381,8 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { Instant testInstant = Instant.now(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList(), + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); try ( AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index cef1c9ae5a..1c53e62dd5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -25,6 +25,7 @@ import java.time.Duration; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; import org.apache.hadoop.fs.statistics.IOStatistics; import org.assertj.core.api.Assertions; @@ -47,6 +48,8 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_ALREADY_EXISTS; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; @@ -158,6 +161,10 @@ AbfsClient getMockAbfsClient() throws IOException { // adding mock objects to current AbfsClient AbfsClient spyClient = Mockito.spy(fs.getAbfsStore().getClient()); + AbfsConfiguration spiedConf = Mockito.spy(fs.getAbfsStore().getAbfsConfiguration()); + Mockito.doReturn(DEFAULT_HTTP_CONNECTION_TIMEOUT).when(spiedConf).getHttpConnectionTimeout(); + Mockito.doReturn(DEFAULT_HTTP_READ_TIMEOUT).when(spiedConf).getHttpReadTimeout(); + Mockito.doReturn(spiedConf).when(spyClient).getAbfsConfiguration(); Mockito.doAnswer(answer -> { AbfsRestOperation op = new AbfsRestOperation(AbfsRestOperationType.RenamePath, @@ -191,9 +198,7 @@ private void addSpyBehavior(final AbfsRestOperation spiedRestOp, normalOp2.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, client.getAccessToken()); - when(spiedRestOp.createHttpOperation()) - .thenReturn(failingOperation) - .thenReturn(normalOp2); + Mockito.doReturn(failingOperation).doReturn(normalOp2).when(spiedRestOp).createHttpOperation(); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index b302a1fa93..7f422582e7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -29,6 +29,7 @@ import org.mockito.Mockito; import org.mockito.stubbing.Stubber; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; @@ -37,8 +38,11 @@ import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToRestOpAndHttpOp; + import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; @@ -54,6 +58,8 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class TestAbfsRestOperationMockFailures { @@ -63,7 +69,7 @@ public void testClientRequestIdForConnectTimeoutRetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE); abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1); } @Test @@ -75,7 +81,7 @@ public void testClientRequestIdForConnectAndReadTimeoutRetry() abbreviations[0] = CONNECTION_TIMEOUT_ABBREVIATION; exceptions[1] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE); abbreviations[1] = READ_TIMEOUT_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 1); } @Test @@ -84,7 +90,7 @@ public void testClientRequestIdForReadTimeoutRetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new SocketTimeoutException(READ_TIMEOUT_JDK_MESSAGE); abbreviations[0] = READ_TIMEOUT_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0); } @Test @@ -93,7 +99,7 @@ public void testClientRequestIdForUnknownHostRetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new UnknownHostException(); abbreviations[0] = UNKNOWN_HOST_EXCEPTION_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0); } @Test @@ -102,7 +108,7 @@ public void testClientRequestIdForConnectionResetRetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new SocketTimeoutException(CONNECTION_RESET_MESSAGE + " by peer"); abbreviations[0] = CONNECTION_RESET_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0); } @Test @@ -111,7 +117,7 @@ public void testClientRequestIdForUnknownSocketExRetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new SocketException("unknown"); abbreviations[0] = SOCKET_EXCEPTION_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0); } @Test @@ -120,7 +126,7 @@ public void testClientRequestIdForIOERetry() throws Exception { String[] abbreviations = new String[1]; exceptions[0] = new InterruptedIOException(); abbreviations[0] = IO_EXCEPTION_ABBREVIATION; - testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1); + testClientRequestIdForTimeoutRetry(exceptions, abbreviations, 1, 0); } @Test @@ -158,16 +164,115 @@ public void testClientRequestIdFor503OtherRetry() throws Exception { testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503"); } + /** + * Test for mocking the failure scenario with retry policy assertions. + * Here we will try to create a request with following life cycle: + * 1. Primary Request made fails with Connection Timeout and fall into retry loop + * 2. Retried request fails with 503 and again go for retry + * 3. Retried request fails with 503 and do not go for retry. + * + * We will try to assert that: + * 1. Correct retry policy is used to get the retry interval for each failed request + * 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used + * @throws Exception + */ + + @Test + public void testRetryPolicyWithDifferentFailureReasons() throws Exception { + + AbfsClient abfsClient = Mockito.mock(AbfsClient.class); + ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( + ExponentialRetryPolicy.class); + StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); + AbfsThrottlingIntercept intercept = Mockito.mock( + AbfsThrottlingIntercept.class); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + + AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( + AbfsRestOperationType.ReadFile, + abfsClient, + "PUT", + null, + new ArrayList<>() + )); + + AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); + addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); + + Stubber stubber = Mockito.doThrow(new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE)); + stubber.doNothing().when(httpOperation).processResponse( + nullable(byte[].class), nullable(int.class), nullable(int.class)); + + when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_UNAVAILABLE); + + TracingContext tracingContext = Mockito.mock(TracingContext.class); + Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class)); + Mockito.doReturn("").when(httpOperation).getStorageErrorMessage(); + Mockito.doReturn("").when(httpOperation).getStorageErrorCode(); + Mockito.doReturn("HEAD").when(httpOperation).getMethod(); + Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any()); + + try { + // Operation will fail with CT first and then 503 thereafter. + abfsRestOperation.execute(tracingContext); + } catch(AbfsRestOperationException ex) { + Assertions.assertThat(ex.getStatusCode()) + .describedAs("Status Code must be HTTP_UNAVAILABLE(409)") + .isEqualTo(HTTP_UNAVAILABLE); + } + + // Assert that httpOperation.processResponse was called 3 times. + // One for retry count 0 + // One for retry count 1 after failing with CT + // One for retry count 2 after failing with 50 + Mockito.verify(httpOperation, times(3)).processResponse( + nullable(byte[].class), nullable(int.class), nullable(int.class)); + + // Assert that Static Retry Policy was used after CT failure. + // Iteration 1 failed with CT and shouldRetry was called with retry count 0 + // Before iteration 2 sleep will be computed using static retry policy and retry count 1 + Mockito.verify(abfsClient, Mockito.times(1)) + .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); + Mockito.verify(staticRetryPolicy, Mockito.times(1)) + .shouldRetry(0, -1); + Mockito.verify(staticRetryPolicy, Mockito.times(1)) + .getRetryInterval(1); + Mockito.verify(tracingContext, Mockito.times(1)) + .constructHeader(httpOperation, CONNECTION_TIMEOUT_ABBREVIATION, STATIC_RETRY_POLICY_ABBREVIATION); + + // Assert that exponential Retry Policy was used during second and third Iteration. + // Iteration 2 and 3 failed with 503 and should retry was called with retry count 1 and 2 + // Before iteration 3 sleep will be computed using exponential retry policy and retry count 2 + // Should retry with retry count 2 will return false and no further requests will be made. + Mockito.verify(abfsClient, Mockito.times(2)) + .getRetryPolicy("503"); + Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) + .shouldRetry(1, HTTP_UNAVAILABLE); + Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) + .shouldRetry(2, HTTP_UNAVAILABLE); + Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) + .getRetryInterval(2); + Mockito.verify(tracingContext, Mockito.times(1)) + .constructHeader(httpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + + // Assert that intercept.updateMetrics was called only once during second Iteration + Mockito.verify(intercept, Mockito.times(2)) + .updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class)); + } + private void testClientRequestIdForStatusRetry(int status, String serverErrorMessage, String keyExpected) throws Exception { AbfsClient abfsClient = Mockito.mock(AbfsClient.class); - ExponentialRetryPolicy retryPolicy = Mockito.mock( + ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( ExponentialRetryPolicy.class); - addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy); - + StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); + AbfsThrottlingIntercept intercept = Mockito.mock( + AbfsThrottlingIntercept.class); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + // Create a readfile operation that will fail AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.ReadFile, abfsClient, @@ -201,8 +306,7 @@ private void testClientRequestIdForStatusRetry(int status, TracingContext tracingContext = Mockito.mock(TracingContext.class); Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class)); - Mockito.doReturn(tracingContext) - .when(abfsRestOperation).createNewTracingContext(any()); + Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any()); int[] count = new int[1]; count[0] = 0; @@ -213,7 +317,7 @@ private void testClientRequestIdForStatusRetry(int status, } count[0]++; return null; - }).when(tracingContext).constructHeader(any(), any()); + }).when(tracingContext).constructHeader(any(), any(), any()); abfsRestOperation.execute(tracingContext); Assertions.assertThat(count[0]).isEqualTo(2); @@ -222,12 +326,14 @@ private void testClientRequestIdForStatusRetry(int status, private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, String[] abbreviationsExpected, - int len) throws Exception { + int len, int numOfCTExceptions) throws Exception { AbfsClient abfsClient = Mockito.mock(AbfsClient.class); - ExponentialRetryPolicy retryPolicy = Mockito.mock( + ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( ExponentialRetryPolicy.class); - addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy); - + StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); + AbfsThrottlingIntercept intercept = Mockito.mock( + AbfsThrottlingIntercept.class); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.ReadFile, @@ -265,9 +371,19 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, } count[0]++; return null; - }).when(tracingContext).constructHeader(any(), any()); + }).when(tracingContext).constructHeader(any(), any(), any()); abfsRestOperation.execute(tracingContext); Assertions.assertThat(count[0]).isEqualTo(len + 1); + + /** + * Assert that getRetryPolicy was called with + * failureReason CT only for Connection Timeout Cases. + * For every failed request getRetryPolicy will be called three times + * It will be called with failureReason CT for every request failing with CT + */ + Mockito.verify(abfsClient, Mockito.times( + numOfCTExceptions)) + .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java index 8e79288cf6..37a7a986e1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAzureADAuthenticator.java @@ -60,7 +60,7 @@ public void testDefaultOAuthTokenFetchRetryPolicy() throws Exception { ExponentialRetryPolicy retryPolicy = abfsConfig .getOauthTokenFetchRetryPolicy(); - Assertions.assertThat(retryPolicy.getRetryCount()).describedAs( + Assertions.assertThat(retryPolicy.getMaxRetryCount()).describedAs( "retryCount should be the default value {} as the same " + "is not configured", DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS) @@ -103,7 +103,7 @@ public void testOAuthTokenFetchRetryPolicy() ExponentialRetryPolicy retryPolicy = abfsConfig .getOauthTokenFetchRetryPolicy(); - Assertions.assertThat(retryPolicy.getRetryCount()) + Assertions.assertThat(retryPolicy.getMaxRetryCount()) .describedAs("retryCount should be {}", TEST_RETRY_COUNT) .isEqualTo(TEST_RETRY_COUNT); Assertions.assertThat(retryPolicy.getMinBackoff())