HADOOP-19096. [ABFS] [CST Optimization] Enhance Client-Side Throttling Metrics Logic (#6276)

ABFS has a client-side throttling mechanism which works on the metrics collected
from past requests

When requests are fail due to server-side throttling it updates its
metrics and recalculates any client side backoff.

The choice of which requests should be used to compute client side
backoff interval is based on the http status code:

- Status code in 2xx range: Successful Operations should contribute.
- Status code in 3xx range: Redirection Operations should not contribute.
- Status code in 4xx range: User Errors should not contribute.
- Status code is 503: Throttling Error should contribute only if they
  are due to client limits breach as follows:
  * 503, Ingress Over Account Limit: Should Contribute
  * 503, Egress Over Account Limit: Should Contribute
  * 503, TPS Over Account Limit: Should Contribute
  * 503, Other Server Throttling: Should not Contribute.
- Status code in 5xx range other than 503: Should not Contribute.
- IOException and UnknownHostExceptions: Should not Contribute.

Contributed by Anuj Modi
This commit is contained in:
Anuj Modi 2024-04-10 19:16:23 +05:30 committed by GitHub
parent 281e2d288d
commit dbe2d61258
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 134 additions and 77 deletions

View File

@ -42,8 +42,14 @@ public enum AzureServiceErrorCode {
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null), INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null),
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null),
INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null),
INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), INGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), "Ingress is over the account limit."),
EGRESS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Egress is over the account limit."),
TPS_OVER_ACCOUNT_LIMIT("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"Operations per second is over the account limit."),
OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE,
"The server is currently unable to receive requests. Please retry your request."),
INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null), ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),

View File

@ -167,7 +167,7 @@ private AbfsClient(final URL baseUrl,
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption()); DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName(); sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
} catch (IOException e) { } catch (IOException e) {
// Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact. // Suppress exception, failure to init DelegatingSSLSocketFactory would have only performance impact.
LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : " LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
+ "{}", e.getMessage()); + "{}", e.getMessage());
} }

View File

@ -39,7 +39,9 @@
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding; import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
/** /**
* The AbfsRestOperation for Rest AbfsClient. * The AbfsRestOperation for Rest AbfsClient.
@ -283,7 +285,8 @@ String getClientLatency() {
private boolean executeHttpOperation(final int retryCount, private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException { TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation; AbfsHttpOperation httpOperation;
boolean wasIOExceptionThrown = false; // Used to avoid CST Metric Update in Case of UnknownHost/IO Exception.
boolean wasKnownExceptionThrown = false;
try { try {
// initialize the HTTP request and open the connection // initialize the HTTP request and open the connection
@ -321,7 +324,27 @@ private boolean executeHttpOperation(final int retryCount,
} else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { } else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1); incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
} }
// If no exception occurred till here it means http operation was successfully complete and
// a response from server has been received which might be failure or success.
// If any kind of exception has occurred it will be caught below.
// If request failed to determine failure reason and retry policy here.
// else simply return with success after saving the result.
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
int status = httpOperation.getStatusCode();
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
retryPolicy = client.getRetryPolicy(failureReason);
if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
}
// If the request has succeeded or failed with non-retrial error, save the operation and return.
result = httpOperation;
} catch (UnknownHostException ex) { } catch (UnknownHostException ex) {
wasKnownExceptionThrown = true;
String hostname = null; String hostname = null;
hostname = httpOperation.getHost(); hostname = httpOperation.getHost();
failureReason = RetryReason.getAbbreviation(ex, null, null); failureReason = RetryReason.getAbbreviation(ex, null, null);
@ -333,57 +356,27 @@ private boolean executeHttpOperation(final int retryCount,
} }
return false; return false;
} catch (IOException ex) { } catch (IOException ex) {
wasKnownExceptionThrown = true;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex); LOG.debug("HttpRequestFailure: {}, {}", httpOperation, ex);
} }
failureReason = RetryReason.getAbbreviation(ex, -1, ""); failureReason = RetryReason.getAbbreviation(ex, -1, "");
retryPolicy = client.getRetryPolicy(failureReason); retryPolicy = client.getRetryPolicy(failureReason);
wasIOExceptionThrown = true;
if (!retryPolicy.shouldRetry(retryCount, -1)) { if (!retryPolicy.shouldRetry(retryCount, -1)) {
throw new InvalidAbfsRestOperationException(ex, retryCount); throw new InvalidAbfsRestOperationException(ex, retryCount);
} }
return false; return false;
} finally { } finally {
int status = httpOperation.getStatusCode(); int statusCode = httpOperation.getStatusCode();
/* // Update Metrics only if Succeeded or Throttled due to account limits.
A status less than 300 (2xx range) or greater than or equal // Also Update in case of any unhandled exception is thrown.
to 500 (5xx range) should contribute to throttling metrics being updated. if (shouldUpdateCSTMetrics(statusCode) && !wasKnownExceptionThrown) {
Less than 200 or greater than or equal to 500 show failed operations. 2xx
range contributes to successful operations. 3xx range is for redirects
and 4xx range is for user errors. These should not be a part of
throttling backoff computation.
*/
boolean updateMetricsResponseCode = (status < HttpURLConnection.HTTP_MULT_CHOICE
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
/*
Connection Timeout failures should not contribute to throttling
In case the current request fails with Connection Timeout we will have
ioExceptionThrown true and failure reason as CT
In case the current request failed with 5xx, failure reason will be
updated after finally block but wasIOExceptionThrown will be false;
*/
boolean isCTFailure = CONNECTION_TIMEOUT_ABBREVIATION.equals(failureReason) && wasIOExceptionThrown;
if (updateMetricsResponseCode && !isCTFailure) {
intercept.updateMetrics(operationType, httpOperation); intercept.updateMetrics(operationType, httpOperation);
} }
} }
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);
int status = httpOperation.getStatusCode();
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
retryPolicy = client.getRetryPolicy(failureReason);
if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
}
result = httpOperation;
return true; return true;
} }
@ -443,6 +436,34 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
} }
} }
/**
* Updating Client Side Throttling Metrics for relevant response status codes.
* Following criteria is used to decide based on status code and failure reason.
* <ol>
* <li>Case 1: Status code in 2xx range: Successful Operations should contribute</li>
* <li>Case 2: Status code in 3xx range: Redirection Operations should not contribute</li>
* <li>Case 3: Status code in 4xx range: User Errors should not contribute</li>
* <li>
* Case 4: Status code is 503: Throttling Error should contribute as following:
* <ol>
* <li>Case 4.a: Ingress Over Account Limit: Should Contribute</li>
* <li>Case 4.b: Egress Over Account Limit: Should Contribute</li>
* <li>Case 4.c: TPS Over Account Limit: Should Contribute</li>
* <li>Case 4.d: Other Server Throttling: Should not contribute</li>
* </ol>
* </li>
* <li>Case 5: Status code in 5xx range other than 503: Should not contribute</li>
* </ol>
* @param statusCode
* @return
*/
private boolean shouldUpdateCSTMetrics(final int statusCode) {
return statusCode < HttpURLConnection.HTTP_MULT_CHOICE // Case 1
|| INGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.a
|| EGRESS_LIMIT_BREACH_ABBREVIATION.equals(failureReason) // Case 4.b
|| TPS_LIMIT_BREACH_ABBREVIATION.equals(failureReason); // Case 4.c
}
/** /**
* Creates a new Tracing context before entering the retry loop of a rest operation. * Creates a new Tracing context before entering the retry loop of a rest operation.
* This will ensure all rest operations have unique * This will ensure all rest operations have unique

View File

@ -26,13 +26,13 @@ private RetryReasonConstants() {
public static final String CONNECTION_TIMEOUT_JDK_MESSAGE = "connect timed out"; public static final String CONNECTION_TIMEOUT_JDK_MESSAGE = "connect timed out";
public static final String READ_TIMEOUT_JDK_MESSAGE = "Read timed out"; public static final String READ_TIMEOUT_JDK_MESSAGE = "Read timed out";
public static final String CONNECTION_RESET_MESSAGE = "Connection reset"; public static final String CONNECTION_RESET_MESSAGE = "Connection reset";
public static final String OPERATION_BREACH_MESSAGE = "Operations per second is over the account limit.";
public static final String CONNECTION_RESET_ABBREVIATION = "CR"; public static final String CONNECTION_RESET_ABBREVIATION = "CR";
public static final String CONNECTION_TIMEOUT_ABBREVIATION = "CT"; public static final String CONNECTION_TIMEOUT_ABBREVIATION = "CT";
public static final String READ_TIMEOUT_ABBREVIATION = "RT"; public static final String READ_TIMEOUT_ABBREVIATION = "RT";
public static final String INGRESS_LIMIT_BREACH_ABBREVIATION = "ING"; public static final String INGRESS_LIMIT_BREACH_ABBREVIATION = "ING";
public static final String EGRESS_LIMIT_BREACH_ABBREVIATION = "EGR"; public static final String EGRESS_LIMIT_BREACH_ABBREVIATION = "EGR";
public static final String OPERATION_LIMIT_BREACH_ABBREVIATION = "OPR"; public static final String TPS_LIMIT_BREACH_ABBREVIATION = "OPR";
public static final String OTHER_SERVER_THROTTLING_ABBREVIATION = "OTH";
public static final String UNKNOWN_HOST_EXCEPTION_ABBREVIATION = "UH"; public static final String UNKNOWN_HOST_EXCEPTION_ABBREVIATION = "UH";
public static final String IO_EXCEPTION_ABBREVIATION = "IOE"; public static final String IO_EXCEPTION_ABBREVIATION = "IOE";
public static final String SOCKET_EXCEPTION_ABBREVIATION = "SE"; public static final String SOCKET_EXCEPTION_ABBREVIATION = "SE";

View File

@ -22,10 +22,12 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_STATUS_CATEGORY_QUOTIENT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_STATUS_CATEGORY_QUOTIENT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
/** /**
* Category that can capture server-response errors for 5XX status-code. * Category that can capture server-response errors for 5XX status-code.
@ -56,9 +58,13 @@ String getAbbreviation(final Integer statusCode,
splitedServerErrorMessage)) { splitedServerErrorMessage)) {
return EGRESS_LIMIT_BREACH_ABBREVIATION; return EGRESS_LIMIT_BREACH_ABBREVIATION;
} }
if (OPERATION_BREACH_MESSAGE.equalsIgnoreCase( if (TPS_OVER_ACCOUNT_LIMIT.getErrorMessage().equalsIgnoreCase(
splitedServerErrorMessage)) { splitedServerErrorMessage)) {
return OPERATION_LIMIT_BREACH_ABBREVIATION; return TPS_LIMIT_BREACH_ABBREVIATION;
}
if (OTHER_SERVER_THROTTLING.getErrorMessage().equalsIgnoreCase(
splitedServerErrorMessage)) {
return OTHER_SERVER_THROTTLING_ABBREVIATION;
} }
return HTTP_UNAVAILABLE + ""; return HTTP_UNAVAILABLE + "";
} }

View File

@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -233,6 +234,11 @@ private AbfsRestOperation getRestOperation() throws Exception {
// mocked the response code and the response message to check different // mocked the response code and the response message to check different
// behaviour based on response code. // behaviour based on response code.
Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode(); Mockito.doReturn(responseCode).when(abfsHttpOperation).getConnResponseCode();
if (responseCode == HTTP_UNAVAILABLE) {
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage())
.when(abfsHttpOperation)
.getStorageErrorMessage();
}
Mockito.doReturn(responseMessage) Mockito.doReturn(responseMessage)
.when(abfsHttpOperation) .when(abfsHttpOperation)
.getConnResponseMessage(); .getConnResponseMessage();

View File

@ -38,6 +38,8 @@
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.OTHER_SERVER_THROTTLING;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.STATIC_RETRY_POLICY_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
@ -50,8 +52,8 @@
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OTHER_SERVER_THROTTLING_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
@ -62,6 +64,9 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TestAbfsRestOperationMockFailures { public class TestAbfsRestOperationMockFailures {
// In these tests a request first fails with given exceptions and then succeed on retry.
// Client Side Throttling Metrics will be updated at least for retried request which succeeded.
// For original requests it will be updated only for EGR, IGR, OPR throttling.
@Test @Test
public void testClientRequestIdForConnectTimeoutRetry() throws Exception { public void testClientRequestIdForConnectTimeoutRetry() throws Exception {
@ -131,37 +136,48 @@ public void testClientRequestIdForIOERetry() throws Exception {
@Test @Test
public void testClientRequestIdFor400Retry() throws Exception { public void testClientRequestIdFor400Retry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400"); testClientRequestIdForStatusRetry(HTTP_BAD_REQUEST, "", "400", 1);
} }
@Test @Test
public void testClientRequestIdFor500Retry() throws Exception { public void testClientRequestIdFor500Retry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500"); testClientRequestIdForStatusRetry(HTTP_INTERNAL_ERROR, "", "500", 1);
} }
@Test @Test
public void testClientRequestIdFor503INGRetry() throws Exception { public void testClientRequestIdFor503INGRetry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), INGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
INGRESS_LIMIT_BREACH_ABBREVIATION); INGRESS_LIMIT_BREACH_ABBREVIATION,
2);
} }
@Test @Test
public void testClientRequestIdFor503egrRetry() throws Exception { public void testClientRequestIdFor503EGRRetry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(), EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
EGRESS_LIMIT_BREACH_ABBREVIATION); EGRESS_LIMIT_BREACH_ABBREVIATION,
2);
} }
@Test @Test
public void testClientRequestIdFor503OPRRetry() throws Exception { public void testClientRequestIdFor503OPRRetry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, testClientRequestIdForStatusRetry(
OPERATION_BREACH_MESSAGE, OPERATION_LIMIT_BREACH_ABBREVIATION); HTTP_UNAVAILABLE,
TPS_OVER_ACCOUNT_LIMIT.getErrorMessage(),
TPS_LIMIT_BREACH_ABBREVIATION,
2);
} }
@Test @Test
public void testClientRequestIdFor503OtherRetry() throws Exception { public void testClientRequestIdFor503OtherRetry() throws Exception {
testClientRequestIdForStatusRetry(HTTP_UNAVAILABLE, "Other.", "503"); testClientRequestIdForStatusRetry(
HTTP_UNAVAILABLE,
OTHER_SERVER_THROTTLING.getErrorMessage(),
OTHER_SERVER_THROTTLING_ABBREVIATION,
1);
} }
/** /**
@ -176,7 +192,6 @@ public void testClientRequestIdFor503OtherRetry() throws Exception {
* 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used * 2. Tracing header construction takes place with proper arguments based on the failure reason and retry policy used
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void testRetryPolicyWithDifferentFailureReasons() throws Exception { public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
@ -210,6 +225,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
Mockito.doReturn("").when(httpOperation).getStorageErrorMessage(); Mockito.doReturn("").when(httpOperation).getStorageErrorMessage();
Mockito.doReturn("").when(httpOperation).getStorageErrorCode(); Mockito.doReturn("").when(httpOperation).getStorageErrorCode();
Mockito.doReturn("HEAD").when(httpOperation).getMethod(); Mockito.doReturn("HEAD").when(httpOperation).getMethod();
Mockito.doReturn(EGRESS_OVER_ACCOUNT_LIMIT.getErrorMessage()).when(httpOperation).getStorageErrorMessage();
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any()); Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
try { try {
@ -217,20 +233,18 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
abfsRestOperation.execute(tracingContext); abfsRestOperation.execute(tracingContext);
} catch(AbfsRestOperationException ex) { } catch(AbfsRestOperationException ex) {
Assertions.assertThat(ex.getStatusCode()) Assertions.assertThat(ex.getStatusCode())
.describedAs("Status Code must be HTTP_UNAVAILABLE(409)") .describedAs("Status Code must be HTTP_UNAVAILABLE(503)")
.isEqualTo(HTTP_UNAVAILABLE); .isEqualTo(HTTP_UNAVAILABLE);
} }
// Assert that httpOperation.processResponse was called 3 times. // Assert that httpOperation.processResponse was called 3 times.
// One for retry count 0 // One for retry count 0
// One for retry count 1 after failing with CT // One for retry count 1 after failing with CT
// One for retry count 2 after failing with 50 // One for retry count 2 after failing with 503
Mockito.verify(httpOperation, times(3)).processResponse( Mockito.verify(httpOperation, times(3)).processResponse(
nullable(byte[].class), nullable(int.class), nullable(int.class)); nullable(byte[].class), nullable(int.class), nullable(int.class));
// Assert that Static Retry Policy was used after CT failure. // Primary Request Failed with CT. Static Retry Policy should be used.
// Iteration 1 failed with CT and shouldRetry was called with retry count 0
// Before iteration 2 sleep will be computed using static retry policy and retry count 1
Mockito.verify(abfsClient, Mockito.times(1)) Mockito.verify(abfsClient, Mockito.times(1))
.getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); .getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
Mockito.verify(staticRetryPolicy, Mockito.times(1)) Mockito.verify(staticRetryPolicy, Mockito.times(1))
@ -245,7 +259,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
// Before iteration 3 sleep will be computed using exponential retry policy and retry count 2 // Before iteration 3 sleep will be computed using exponential retry policy and retry count 2
// Should retry with retry count 2 will return false and no further requests will be made. // Should retry with retry count 2 will return false and no further requests will be made.
Mockito.verify(abfsClient, Mockito.times(2)) Mockito.verify(abfsClient, Mockito.times(2))
.getRetryPolicy("503"); .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION);
Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
.shouldRetry(1, HTTP_UNAVAILABLE); .shouldRetry(1, HTTP_UNAVAILABLE);
Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
@ -253,16 +267,17 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception {
Mockito.verify(exponentialRetryPolicy, Mockito.times(1)) Mockito.verify(exponentialRetryPolicy, Mockito.times(1))
.getRetryInterval(2); .getRetryInterval(2);
Mockito.verify(tracingContext, Mockito.times(1)) Mockito.verify(tracingContext, Mockito.times(1))
.constructHeader(httpOperation, "503", EXPONENTIAL_RETRY_POLICY_ABBREVIATION); .constructHeader(httpOperation, EGRESS_LIMIT_BREACH_ABBREVIATION, EXPONENTIAL_RETRY_POLICY_ABBREVIATION);
// Assert that intercept.updateMetrics was called only once during second Iteration // Assert that intercept.updateMetrics was called 2 times. Both the retried request fails with EGR.
Mockito.verify(intercept, Mockito.times(2)) Mockito.verify(intercept, Mockito.times(2))
.updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class)); .updateMetrics(nullable(AbfsRestOperationType.class), nullable(AbfsHttpOperation.class));
} }
private void testClientRequestIdForStatusRetry(int status, private void testClientRequestIdForStatusRetry(int status,
String serverErrorMessage, String serverErrorMessage,
String keyExpected) throws Exception { String keyExpected,
int numOfTimesCSTMetricsUpdated) throws Exception {
AbfsClient abfsClient = Mockito.mock(AbfsClient.class); AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
@ -322,11 +337,14 @@ private void testClientRequestIdForStatusRetry(int status,
abfsRestOperation.execute(tracingContext); abfsRestOperation.execute(tracingContext);
Assertions.assertThat(count[0]).isEqualTo(2); Assertions.assertThat(count[0]).isEqualTo(2);
Mockito.verify(intercept, Mockito.times(numOfTimesCSTMetricsUpdated)).updateMetrics(any(), any());
} }
private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
String[] abbreviationsExpected, String[] abbreviationsExpected,
int len, int numOfCTExceptions) throws Exception { int len,
int numOfCTExceptions) throws Exception {
AbfsClient abfsClient = Mockito.mock(AbfsClient.class); AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock( ExponentialRetryPolicy exponentialRetryPolicy = Mockito.mock(
ExponentialRetryPolicy.class); ExponentialRetryPolicy.class);

View File

@ -31,6 +31,7 @@
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.TPS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
@ -38,8 +39,7 @@
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.EGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.INGRESS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.IO_EXCEPTION_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_BREACH_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.TPS_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.OPERATION_LIMIT_BREACH_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.READ_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.SOCKET_EXCEPTION_ABBREVIATION;
@ -92,8 +92,8 @@ public void testIngressLimitRetryReason() {
@Test @Test
public void testOperationLimitRetryReason() { public void testOperationLimitRetryReason() {
Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, OPERATION_BREACH_MESSAGE)).isEqualTo( Assertions.assertThat(RetryReason.getAbbreviation(null, HTTP_UNAVAILABLE, TPS_OVER_ACCOUNT_LIMIT.getErrorMessage())).isEqualTo(
OPERATION_LIMIT_BREACH_ABBREVIATION TPS_LIMIT_BREACH_ABBREVIATION
); );
} }