HADOOP-17873. ABFS: Fix transient failures in ITestAbfsStreamStatistics and ITestAbfsRestOperationException (#3699)
Successor for the reverted PR #3341, using the hadoop @VisibleForTesting attribute Contributed by Sumangala Patki
This commit is contained in:
parent
5f3bc4340e
commit
2e4c5ca88f
@ -616,6 +616,7 @@
|
||||
<exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
|
||||
<exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
|
||||
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
|
||||
<exclude>**/azurebfs/ITestAbfsStreamStatistics*.java</exclude>
|
||||
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
|
||||
<exclude>**/azurebfs/commit/*.java</exclude>
|
||||
</excludes>
|
||||
@ -659,6 +660,7 @@
|
||||
<include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
|
||||
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
|
||||
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
|
||||
<include>**/azurebfs/ITestAbfsStreamStatistics*.java</include>
|
||||
<include>**/azurebfs/commit/*.java</include>
|
||||
</includes>
|
||||
</configuration>
|
||||
|
@ -22,7 +22,8 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -138,4 +139,9 @@ public String getUserAgentSuffix() {
|
||||
String suffix = ExtensionHelper.getUserAgentSuffix(adaptee, "");
|
||||
return suffix != null ? suffix : "";
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected CustomTokenProviderAdaptee getCustomTokenProviderAdaptee() {
|
||||
return adaptee;
|
||||
}
|
||||
}
|
||||
|
@ -1289,4 +1289,9 @@ public ListenableFuture<?> submit(Runnable runnable) {
|
||||
public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
|
||||
Futures.addCallback(future, callback, executorService);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected AccessTokenProvider getTokenProvider() {
|
||||
return tokenProvider;
|
||||
}
|
||||
}
|
||||
|
@ -37,10 +37,12 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
|
||||
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
@ -251,6 +253,9 @@ public Hashtable<String, String> call() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
public AccessTokenProvider getAccessTokenProvider(final AzureBlobFileSystem fs) {
|
||||
return TestAbfsClient.getAccessTokenProvider(fs.getAbfsStore().getClient());
|
||||
}
|
||||
|
||||
public void loadConfiguredFileSystem() throws Exception {
|
||||
// disable auto-creation of filesystem
|
||||
|
@ -112,7 +112,10 @@ public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exce
|
||||
final AzureBlobFileSystem fs1 =
|
||||
(AzureBlobFileSystem) FileSystem.newInstance(fs.getUri(),
|
||||
config);
|
||||
RetryTestTokenProvider.ResetStatusToFirstTokenFetch();
|
||||
RetryTestTokenProvider retryTestTokenProvider
|
||||
= RetryTestTokenProvider.getCurrentRetryTestProviderInstance(
|
||||
getAccessTokenProvider(fs1));
|
||||
retryTestTokenProvider.resetStatusToFirstTokenFetch();
|
||||
|
||||
intercept(Exception.class,
|
||||
()-> {
|
||||
@ -120,10 +123,10 @@ public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exce
|
||||
});
|
||||
|
||||
// Number of retries done should be as configured
|
||||
Assert.assertTrue(
|
||||
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
|
||||
+ ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries
|
||||
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
|
||||
Assert.assertEquals(
|
||||
"Number of token fetch retries done does not match with fs.azure"
|
||||
+ ".custom.token.fetch.retry.count configured", numOfRetries,
|
||||
retryTestTokenProvider.getRetryCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -30,12 +30,12 @@
|
||||
*/
|
||||
public class RetryTestTokenProvider implements CustomTokenProviderAdaptee {
|
||||
|
||||
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||
private static boolean isThisFirstTokenFetch = true;
|
||||
public static int reTryCount = 0;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
RetryTestTokenProvider.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(RetryTestTokenProvider.class);
|
||||
// Need to track first token fetch otherwise will get counted as a retry too.
|
||||
private boolean isThisFirstTokenFetch = true;
|
||||
private int retryCount = 0;
|
||||
|
||||
@Override
|
||||
public void initialize(Configuration configuration, String accountName)
|
||||
@ -43,9 +43,13 @@ public void initialize(Configuration configuration, String accountName)
|
||||
|
||||
}
|
||||
|
||||
public static void ResetStatusToFirstTokenFetch() {
|
||||
/**
|
||||
* Clear earlier retry details and reset RetryTestTokenProvider instance to
|
||||
* state of first access token fetch call.
|
||||
*/
|
||||
public void resetStatusToFirstTokenFetch() {
|
||||
isThisFirstTokenFetch = true;
|
||||
reTryCount = 0;
|
||||
retryCount = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -53,7 +57,7 @@ public String getAccessToken() throws IOException {
|
||||
if (isThisFirstTokenFetch) {
|
||||
isThisFirstTokenFetch = false;
|
||||
} else {
|
||||
reTryCount++;
|
||||
retryCount++;
|
||||
}
|
||||
|
||||
LOG.debug("RetryTestTokenProvider: Throw an exception in fetching tokens");
|
||||
@ -64,4 +68,13 @@ public String getAccessToken() throws IOException {
|
||||
public Date getExpiryTime() {
|
||||
return new Date();
|
||||
}
|
||||
|
||||
public static RetryTestTokenProvider getCurrentRetryTestProviderInstance(
|
||||
AccessTokenProvider customTokenProvider) {
|
||||
return (RetryTestTokenProvider) ((CustomTokenProviderAdapter) customTokenProvider).getCustomTokenProviderAdaptee();
|
||||
}
|
||||
|
||||
public int getRetryCount() {
|
||||
return retryCount;
|
||||
}
|
||||
}
|
||||
|
@ -395,4 +395,8 @@ public static AbfsRestOperation getRestOp(AbfsRestOperationType type,
|
||||
url,
|
||||
requestHeaders);
|
||||
}
|
||||
|
||||
public static AccessTokenProvider getAccessTokenProvider(AbfsClient client) {
|
||||
return client.getTokenProvider();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user