diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 6c0ecfcdf8..12e687c15b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -42,8 +42,14 @@ public enum AzureServiceErrorCode { INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null), RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), - INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), - EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), + INGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE, + "Ingress is over the account limit."), + EGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE, + "Egress is over the account limit."), + TPS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE, + "Operations per second is over the account limit."), + OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE, + "The server is currently unable to receive requests. Please retry your request."), INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index ee99f99280..1ab1c7a0af 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -167,7 +167,7 @@ private AbfsClient(final URL baseUrl, DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption()); sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName(); } catch (IOException e) { - // Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact. + // Suppress exception, failure to init DelegatingSSLSocketFactory would have only performance impact. LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : " + "{}", e.getMessage()); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index e901196bcc..4abe9a574a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -39,7 +39,9 @@ import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION; /** * The AbfsRestOperation for Rest AbfsClient. @@ -283,7 +285,8 @@ String getClientLatency() { private boolean executeHttpOperation(final int retryCount, TracingContext tracingContext) throws AzureBlobFileSystemException { AbfsHttpOperation httpOperation; - boolean wasIOExceptionThrown = false; + // Used to avoid CST Metric Update in Case of UnknownHost/IO Exception. + boolean wasKnownExceptionThrown = false; try { // initialize the HTTP request and open the connection @@ -321,7 +324,27 @@ private boolean executeHttpOperation(final int retryCount, } else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1); } + + // If no exception occurred till here it means http operation was successfully complete and + // a response from server has been received which might be failure or success. + // If any kind of exception has occurred it will be caught below. + // If request failed to determine failure reason and retry policy here. + // else simply return with success after saving the result. + LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); + + int status = httpOperation.getStatusCode(); + failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); + retryPolicy = client.getRetryPolicy(failureReason); + + if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) { + return false; + } + + // If the request has succeeded or failed with non-retrial error, save the operation and return. + result = httpOperation; + } catch (UnknownHostException ex) { + wasKnownExceptionThrown = true; String hostname = null; hostname = httpOperation.getHost(); failureReason = RetryReason.getAbbreviation(ex, null, null); @@ -333,57 +356,27 @@ private boolean executeHttpOperation(final int retryCount, } return false; } catch (IOException ex) { + wasKnownExceptionThrown = true; if (LOG.isDebugEnabled()) { LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex); } failureReason = RetryReason.getAbbreviation(ex, -1, ""); retryPolicy = client.getRetryPolicy(failureReason); - wasIOExceptionThrown = true; if (!retryPolicy.shouldRetry(retryCount, -1)) { throw new InvalidAbfsRestOperationException(ex, retryCount); } return false; } finally { - int status = httpOperation.getStatusCode(); - /* - A status less than 300 (2xx range) or greater than or equal - to 500 (5xx range) should contribute to throttling metrics being updated. - Less than 200 or greater than or equal to 500 show failed operations. 2xx - range contributes to successful operations. 3xx range is for redirects - and 4xx range is for user errors. These should not be a part of - throttling backoff computation. - */ - boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE - || status >= HttpURLConnection.HTTP_INTERNAL_ERROR); - - /* - Connection Timeout failures should not contribute to throttling - In case the current request fails with Connection Timeout we will have - ioExceptionThrown true and failure reason as CT - In case the current request failed with 5xx, failure reason will be - updated after finally block but wasIOExceptionThrown will be false; - */ - boolean isCTFailure = CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown; - - if (updateMetricsResponseCode && !isCTFailure) { + int statusCode = httpOperation.getStatusCode(); + // Update Metrics only if Succeeded or Throttled due to account limits. + // Also Update in case of any unhandled exception is thrown. + if (shouldUpdateCSTMetrics(statusCode) && !wasKnownExceptionThrown) { intercept.updateMetrics(operationType, httpOperation); } } - LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); - - int status = httpOperation.getStatusCode(); - failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); - retryPolicy = client.getRetryPolicy(failureReason); - - if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) { - return false; - } - - result = httpOperation; - return true; } @@ -443,6 +436,34 @@ private void incrementCounter(AbfsStatistic statistic, long value) { } } + /** + * Updating Client Side Throttling Metrics for relevant response status codes. + * Following criteria is used to decide based on status code and failure reason. + *
    + *
  1. Case 1: Status code in 2xx range: Successful Operations should contribute
  2. + *
  3. Case 2: Status code in 3xx range: Redirection Operations should not contribute
  4. + *
  5. Case 3: Status code in 4xx range: User Errors should not contribute
  6. + *
  7. + * Case 4: Status code is 503: Throttling Error should contribute as following: + *
      + *
    1. Case 4.a: Ingress Over Account Limit: Should Contribute
    2. + *
    3. Case 4.b: Egress Over Account Limit: Should Contribute
    4. + *
    5. Case 4.c: TPS Over Account Limit: Should Contribute
    6. + *
    7. Case 4.d: Other Server Throttling: Should not contribute
    8. + *
    + *
  8. + *
  9. Case 5: Status code in 5xx range other than 503: Should not contribute
  10. + *
+ * @param statusCode + * @return + */ + private boolean shouldUpdateCSTMetrics(final int statusCode) { + return statusCode < HttpURLConnection.HTTP_MULT_CHOICE // Case 1 + || INGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.a + || EGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.b + || TPS_LIMIT_BREACH_ABBREVIATION.equals(failureReason); // Case 4.c + } + /** * Creates a new Tracing context before entering the retry loop of a rest operation. * This will ensure all rest operations have unique diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java index 8a0af183e3..42d8587aa6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RetryReasonConstants.java @@ -26,13 +26,13 @@ private RetryReasonConstants() { public static final String CONNECTION_TIMEOUT_JDK_MESSAGE = "connect timed out"; public static final String READ_TIMEOUT_JDK_MESSAGE = "Read timed out"; public static final String CONNECTION_RESET_MESSAGE = "Connection reset"; - public static final String OPERATION_BREACH_MESSAGE = "Operations per second is over the account limit."; public static final String CONNECTION_RESET_ABBREVIATION = "CR"; public static final String CONNECTION_TIMEOUT_ABBREVIATION = "CT"; public static final String READ_TIMEOUT_ABBREVIATION = "RT"; public static final String INGRESS_LIMIT_BREACH_ABBREVIATION = "ING"; public static final String EGRESS_LIMIT_BREACH_ABBREVIATION = "EGR"; - public static final String OPERATION_LIMIT_BREACH_ABBREVIATION = "OPR"; + public static final String TPS_LIMIT_BREACH_ABBREVIATION = "OPR"; + public static final String OTHER_SERVER_THROTTLING_ABBREVIATION = "OTH"; public static final String UNKNOWN_HOST_EXCEPTION_ABBREVIATION = "UH"; public static final String IO_EXCEPTION_ABBREVIATION = "IOE"; public static final String SOCKET_EXCEPTION_ABBREVIATION = "SE"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java index dd67a0cb8c..727dcfd8dc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/retryReasonCategories/ServerErrorRetryReason.java @@ -22,10 +22,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_STATUS_CATEGORY_QUOTIENT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION; /** * Category that can capture server-response errors for 5XX status-code. @@ -56,9 +58,13 @@ String getAbbreviation(final Integer statusCode, splitedServerErrorMessage)) { return EGRESS_LIMIT_BREACH_ABBREVIATION; } - if (OPERATION_BREACH_MESSAGE.equalsIgnoreCase( + if (TPS_OVER_ACCOUNT_LIMIT.getErrorMessage().equalsIgnoreCase( splitedServerErrorMessage)) { - return OPERATION_LIMIT_BREACH_ABBREVIATION; + return TPS_LIMIT_BREACH_ABBREVIATION; + } + if (OTHER_SERVER_THROTTLING.getErrorMessage().equalsIgnoreCase( + splitedServerErrorMessage)) { + return OTHER_SERVER_THROTTLING_ABBREVIATION; } return HTTP_UNAVAILABLE + ""; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 32897355f1..41cbc3be3b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -61,6 +61,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -233,6 +234,11 @@ private AbfsRestOperation getRestOperation() throws Exception { // mocked the response code and the response message to check different // behaviour based on response code. Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode(); + if (responseCode == HTTP_UNAVAILABLE) { + Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()) + .when(abfsHttpOperation) + .getStorageErrorMessage(); + } Mockito.doReturn(responseMessage) .when(abfsHttpOperation) .getConnResponseMessage(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index 7f422582e7..078b42cf0d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -38,6 +38,8 @@ import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient; @@ -50,8 +52,8 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION; @@ -62,6 +64,9 @@ import static org.mockito.Mockito.when; public class TestAbfsRestOperationMockFailures { + // In these tests a request first fails with given exceptions and then succeed on retry. + // Client Side Throttling Metrics will be updated at least for retried request which succeeded. + // For original requests it will be updated only for EGR, IGR, OPR throttling. @Test public void testClientRequestIdForConnectTimeoutRetry() throws Exception { @@ -131,37 +136,48 @@ public void testClientRequestIdForIOERetry() throws Exception { @Test public void testClientRequestIdFor400Retry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400"); + testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400", 1); } @Test public void testClientRequestIdFor500Retry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500"); + testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500", 1); } @Test public void testClientRequestIdFor503INGRetry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, - INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), - INGRESS_LIMIT_BREACH_ABBREVIATION); + testClientRequestIdForStatusRetry( + HTTP_UNAVAILABLE, + INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), + INGRESS_LIMIT_BREACH_ABBREVIATION, + 2); } @Test - public void testClientRequestIdFor503egrRetry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, - EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), - EGRESS_LIMIT_BREACH_ABBREVIATION); + public void testClientRequestIdFor503EGRRetry() throws Exception { + testClientRequestIdForStatusRetry( + HTTP_UNAVAILABLE, + EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), + EGRESS_LIMIT_BREACH_ABBREVIATION, + 2); } @Test public void testClientRequestIdFor503OPRRetry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, - OPERATION_BREACH_MESSAGE, OPERATION_LIMIT_BREACH_ABBREVIATION); + testClientRequestIdForStatusRetry( + HTTP_UNAVAILABLE, + TPS_OVER_ACCOUNT_LIMIT.getErrorMessage(), + TPS_LIMIT_BREACH_ABBREVIATION, + 2); } @Test public void testClientRequestIdFor503OtherRetry() throws Exception { - testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503"); + testClientRequestIdForStatusRetry( + HTTP_UNAVAILABLE, + OTHER_SERVER_THROTTLING.getErrorMessage(), + OTHER_SERVER_THROTTLING_ABBREVIATION, + 1); } /** @@ -176,7 +192,6 @@ public void testClientRequestIdFor503OtherRetry() throws Exception { * 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used * @throws Exception */ - @Test public void testRetryPolicyWithDifferentFailureReasons() throws Exception { @@ -210,6 +225,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { Mockito.doReturn("").when(httpOperation).getStorageErrorMessage(); Mockito.doReturn("").when(httpOperation).getStorageErrorCode(); Mockito.doReturn("HEAD").when(httpOperation).getMethod(); + Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage(); Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any()); try { @@ -217,20 +233,18 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { abfsRestOperation.execute(tracingContext); } catch(AbfsRestOperationException ex) { Assertions.assertThat(ex.getStatusCode()) - .describedAs("Status Code must be HTTP_UNAVAILABLE(409)") + .describedAs("Status Code must be HTTP_UNAVAILABLE(503)") .isEqualTo(HTTP_UNAVAILABLE); } // Assert that httpOperation.processResponse was called 3 times. // One for retry count 0 // One for retry count 1 after failing with CT - // One for retry count 2 after failing with 50 + // One for retry count 2 after failing with 503 Mockito.verify(httpOperation, times(3)).processResponse( nullable(byte[].class), nullable(int.class), nullable(int.class)); - // Assert that Static Retry Policy was used after CT failure. - // Iteration 1 failed with CT and shouldRetry was called with retry count 0 - // Before iteration 2 sleep will be computed using static retry policy and retry count 1 + // Primary Request Failed with CT. Static Retry Policy should be used. Mockito.verify(abfsClient, Mockito.times(1)) .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); Mockito.verify(staticRetryPolicy, Mockito.times(1)) @@ -245,7 +259,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { // Before iteration 3 sleep will be computed using exponential retry policy and retry count 2 // Should retry with retry count 2 will return false and no further requests will be made. Mockito.verify(abfsClient, Mockito.times(2)) - .getRetryPolicy("503"); + .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION); Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) .shouldRetry(1, HTTP_UNAVAILABLE); Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) @@ -253,16 +267,17 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) .getRetryInterval(2); Mockito.verify(tracingContext, Mockito.times(1)) - .constructHeader(httpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + .constructHeader(httpOperation, EGRESS_LIMIT_BREACH_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION); - // Assert that intercept.updateMetrics was called only once during second Iteration + // Assert that intercept.updateMetrics was called 2 times. Both the retried request fails with EGR. Mockito.verify(intercept, Mockito.times(2)) .updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class)); } private void testClientRequestIdForStatusRetry(int status, - String serverErrorMessage, - String keyExpected) throws Exception { + String serverErrorMessage, + String keyExpected, + int numOfTimesCSTMetricsUpdated) throws Exception { AbfsClient abfsClient = Mockito.mock(AbfsClient.class); ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( @@ -322,11 +337,14 @@ private void testClientRequestIdForStatusRetry(int status, abfsRestOperation.execute(tracingContext); Assertions.assertThat(count[0]).isEqualTo(2); + Mockito.verify(intercept, Mockito.times(numOfTimesCSTMetricsUpdated)).updateMetrics(any(), any()); + } private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, - String[] abbreviationsExpected, - int len, int numOfCTExceptions) throws Exception { + String[] abbreviationsExpected, + int len, + int numOfCTExceptions) throws Exception { AbfsClient abfsClient = Mockito.mock(AbfsClient.class); ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( ExponentialRetryPolicy.class); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java index 76fcc6dc2c..d9d8ee51f9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestRetryReason.java @@ -31,6 +31,7 @@ import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; @@ -38,8 +39,7 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; -import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION; @@ -92,8 +92,8 @@ public void testIngressLimitRetryReason() { @Test public void testOperationLimitRetryReason() { - Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, OPERATION_BREACH_MESSAGE)).isEqualTo( - OPERATION_LIMIT_BREACH_ABBREVIATION + Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, TPS_OVER_ACCOUNT_LIMIT.getErrorMessage())).isEqualTo( + TPS_LIMIT_BREACH_ABBREVIATION ); }