HADOOP-16660. ABFS: Make RetryCount in ExponentialRetryPolicy Configurable.
Contributed by Sneha Vijayarajan.
This commit is contained in:
parent
9e69628f55
commit
82ad9b549f
@ -1129,7 +1129,9 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||||||
abfsConfiguration.getRawConfiguration());
|
abfsConfiguration.getRawConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider, abfsPerfTracker);
|
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||||
|
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||||
|
tokenProvider, abfsPerfTracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getOctalNotation(FsPermission fsPermission) {
|
private String getOctalNotation(FsPermission fsPermission) {
|
||||||
|
@ -130,8 +130,11 @@ public class AbfsRestOperation {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int retryCount = 0;
|
int retryCount = 0;
|
||||||
|
LOG.debug("First execution of REST operation - {}", operationType);
|
||||||
while (!executeHttpOperation(retryCount++)) {
|
while (!executeHttpOperation(retryCount++)) {
|
||||||
try {
|
try {
|
||||||
|
LOG.debug("Retrying REST operation {}. RetryCount = {}",
|
||||||
|
operationType, retryCount);
|
||||||
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
||||||
} catch (InterruptedException ex) {
|
} catch (InterruptedException ex) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -25,11 +25,6 @@ import java.net.HttpURLConnection;
|
|||||||
* Retry policy used by AbfsClient.
|
* Retry policy used by AbfsClient.
|
||||||
* */
|
* */
|
||||||
public class ExponentialRetryPolicy {
|
public class ExponentialRetryPolicy {
|
||||||
/**
|
|
||||||
* Represents the default number of retry attempts.
|
|
||||||
*/
|
|
||||||
private static final int DEFAULT_CLIENT_RETRY_COUNT = 30;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the default amount of time used when calculating a random delta in the exponential
|
* Represents the default amount of time used when calculating a random delta in the exponential
|
||||||
* delay between retries.
|
* delay between retries.
|
||||||
@ -86,8 +81,10 @@ public class ExponentialRetryPolicy {
|
|||||||
/**
|
/**
|
||||||
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
|
||||||
*/
|
*/
|
||||||
public ExponentialRetryPolicy() {
|
public ExponentialRetryPolicy(final int maxIoRetries) {
|
||||||
this(DEFAULT_CLIENT_RETRY_COUNT, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_CLIENT_BACKOFF);
|
|
||||||
|
this(maxIoRetries, DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF,
|
||||||
|
DEFAULT_CLIENT_BACKOFF);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,12 +20,17 @@ package org.apache.hadoop.fs.azurebfs;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.oauth2.RetryTestTokenProvider;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the AbfsRestOperationException error message format.
|
* Verify the AbfsRestOperationException error message format.
|
||||||
* */
|
* */
|
||||||
@ -72,4 +77,40 @@ public class ITestAbfsRestOperationException extends AbstractAbfsIntegrationTest
|
|||||||
&& errorFields[5].contains("Time"));
|
&& errorFields[5].contains("Time"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestRetryConfig() throws Exception {
|
||||||
|
testRetryLogic(0);
|
||||||
|
testRetryLogic(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRetryLogic(int numOfRetries) throws Exception {
|
||||||
|
AzureBlobFileSystem fs = this.getFileSystem();
|
||||||
|
|
||||||
|
Configuration config = new Configuration(this.getRawConfiguration());
|
||||||
|
String accountName = config.get("fs.azure.abfs.account.name");
|
||||||
|
// Setup to configure custom token provider
|
||||||
|
config.set("fs.azure.account.auth.type." + accountName, "Custom");
|
||||||
|
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
|
||||||
|
+ ".azurebfs.oauth2.RetryTestTokenProvider");
|
||||||
|
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
|
||||||
|
// Stop filesystem creation as it will lead to calls to store.
|
||||||
|
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs1 =
|
||||||
|
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
||||||
|
config);
|
||||||
|
RetryTestTokenProvider.ResetStatusToFirstTokenFetch();
|
||||||
|
|
||||||
|
intercept(Exception.class,
|
||||||
|
()-> {
|
||||||
|
fs1.getFileStatus(new Path("/"));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Number of retries done should be as configured
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||||
|
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
|
||||||
|
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||||
|
}
|
||||||
}
|
}
|
@ -0,0 +1,67 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Date;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Token provider which should throw exception and trigger retries
|
||||||
|
*/
|
||||||
|
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||||
|
|
||||||
|
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||||
|
private static boolean isThisFirstTokenFetch = true;
|
||||||
|
public static int reTryCount = 0;
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(RetryTestTokenProvider.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(Configuration configuration, String accountName)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void ResetStatusToFirstTokenFetch() {
|
||||||
|
isThisFirstTokenFetch = true;
|
||||||
|
reTryCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getAccessToken() throws IOException {
|
||||||
|
if (isThisFirstTokenFetch) {
|
||||||
|
isThisFirstTokenFetch = false;
|
||||||
|
} else {
|
||||||
|
reTryCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
||||||
|
throw new IOException("test exception");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Date getExpiryTime() {
|
||||||
|
return new Date();
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user