HADOOP-18872: [ABFS] [BugFix] Misreporting Retry Count for Sub-sequential and Parallel Operations (#6019)

Contributed by Anuj Modi
This commit is contained in:
Anuj Modi 2023-11-13 11:36:33 -08:00 committed by GitHub
parent 342e6caba1
commit 000a39ba2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 498 additions and 189 deletions

View File

@ -260,26 +260,25 @@ AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
return abfsUriQueryBuilder;
}
public AbfsRestOperation createFilesystem(TracingContext tracingContext) throws AzureBlobFileSystemException {
public AbfsRestOperation createFilesystem(TracingContext tracingContext)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreateFileSystem,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.CreateFileSystem,
HTTP_METHOD_PUT, url, requestHeaders);
op.execute(tracingContext);
return op;
}
public AbfsRestOperation setFilesystemProperties(final String properties, TracingContext tracingContext) throws AzureBlobFileSystemException {
public AbfsRestOperation setFilesystemProperties(final String properties,
TracingContext tracingContext) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// JDK7 does not support PATCH, so to work around the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
@ -291,9 +290,8 @@ public AbfsRestOperation setFilesystemProperties(final String properties, Tracin
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.SetFileSystemProperties,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
@ -316,9 +314,8 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur
appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.ListPaths,
this,
HTTP_METHOD_GET,
url,
requestHeaders);
@ -333,9 +330,8 @@ public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext)
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
this,
HTTP_METHOD_HEAD,
url,
requestHeaders);
@ -350,9 +346,8 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) throws
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.DeleteFileSystem,
this,
HTTP_METHOD_DELETE,
url,
requestHeaders);
@ -396,9 +391,8 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.CreatePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
@ -431,9 +425,8 @@ public AbfsRestOperation acquireLease(final String path, int duration, TracingCo
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
@ -451,9 +444,8 @@ public AbfsRestOperation renewLease(final String path, final String leaseId,
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
@ -471,9 +463,8 @@ public AbfsRestOperation releaseLease(final String path,
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
@ -491,9 +482,8 @@ public AbfsRestOperation breakLease(final String path,
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.LeasePath,
this,
HTTP_METHOD_POST,
url,
requestHeaders);
@ -646,9 +636,8 @@ private boolean checkIsDir(AbfsHttpOperation result) {
@VisibleForTesting
AbfsRestOperation createRenameRestOperation(URL url, List<AbfsHttpHeader> requestHeaders) {
AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
@ -766,7 +755,8 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperationForAppend(AbfsRestOperationType.Append,
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
requestHeaders,
@ -801,7 +791,7 @@ public AbfsRestOperation append(final String path, final byte[] buffer,
if (reqParams.isAppendBlob()
&& appendSuccessCheckOp(op, path,
(reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
final AbfsRestOperation successOp = getAbfsRestOperationForAppend(
final AbfsRestOperation successOp = getAbfsRestOperation(
AbfsRestOperationType.Append,
HTTP_METHOD_PUT,
url,
@ -819,38 +809,6 @@ && appendSuccessCheckOp(op, path,
return op;
}
/**
* Returns the rest operation for append.
* @param operationType The AbfsRestOperationType.
* @param httpMethod specifies the httpMethod.
* @param url specifies the url.
* @param requestHeaders This includes the list of request headers.
* @param buffer The buffer to write into.
* @param bufferOffset The buffer offset.
* @param bufferLength The buffer Length.
* @param sasTokenForReuse The sasToken.
* @return AbfsRestOperation op.
*/
@VisibleForTesting
AbfsRestOperation getAbfsRestOperationForAppend(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final byte[] buffer,
final int bufferOffset,
final int bufferLength,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength, sasTokenForReuse);
}
/**
* Returns true if the status code lies in the range of user error.
* @param responseStatusCode http response status code.
@ -907,9 +865,8 @@ public AbfsRestOperation flush(final String path, final long position,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.Flush,
this,
HTTP_METHOD_PUT,
url,
requestHeaders, sasTokenForReuse);
@ -934,9 +891,8 @@ public AbfsRestOperation setPathProperties(final String path, final String prope
appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.SetPathProperties,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
@ -963,9 +919,8 @@ public AbfsRestOperation getPathStatus(final String path, final boolean includeP
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetPathStatus,
this,
HTTP_METHOD_HEAD,
url,
requestHeaders);
@ -988,9 +943,8 @@ public AbfsRestOperation read(final String path, final long position, final byte
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.ReadFile,
this,
HTTP_METHOD_GET,
url,
requestHeaders,
@ -1063,9 +1017,8 @@ public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) {
&& DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
// Server has returned HTTP 404, which means path no longer
// exists. Assuming delete result to be idempotent, return success.
final AbfsRestOperation successOp = new AbfsRestOperation(
final AbfsRestOperation successOp = getAbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
op.getUrl(),
op.getRequestHeaders());
@ -1098,9 +1051,8 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S
appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.SetOwner,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
@ -1124,9 +1076,8 @@ public AbfsRestOperation setPermission(final String path, final String permissio
appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.SetPermissions,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
@ -1159,9 +1110,8 @@ public AbfsRestOperation setAcl(final String path, final String aclSpecString, f
appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.SetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
@ -1184,9 +1134,8 @@ public AbfsRestOperation getAclStatus(final String path, final boolean useUPN,
appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,
requestHeaders);
@ -1211,9 +1160,11 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra
abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder);
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CheckAccess, this,
AbfsHttpConstants.HTTP_METHOD_HEAD, url, createDefaultHeaders());
AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.CheckAccess,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,
createDefaultHeaders());
op.execute(tracingContext);
return op;
}
@ -1238,7 +1189,7 @@ public static String getDirectoryQueryParameter(final String path) {
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder
* If configured for SAS AuthType, appends SAS token to queryBuilder.
* @param path
* @param operation
* @param queryBuilder
@ -1250,7 +1201,7 @@ private String appendSASTokenToQuery(String path, String operation, AbfsUriQuery
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder
* If configured for SAS AuthType, appends SAS token to queryBuilder.
* @param path
* @param operation
* @param queryBuilder
@ -1459,4 +1410,82 @@ public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callba
protected AccessTokenProvider getTokenProvider() {
return tokenProvider;
}
/**
* Creates an AbfsRestOperation with additional parameters for buffer and SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @param buffer The byte buffer containing data for the operation.
* @param bufferOffset The offset within the buffer where the data starts.
* @param bufferLength The length of the data within the buffer.
* @param sasTokenForReuse The SAS token for reusing authentication.
* @return An AbfsRestOperation instance.
*/
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final byte[] buffer,
final int bufferOffset,
final int bufferLength,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength,
sasTokenForReuse);
}
/**
* Creates an AbfsRestOperation with basic parameters and no buffer or SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @return An AbfsRestOperation instance.
*/
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders
);
}
/**
* Creates an AbfsRestOperation with parameters including request headers and SAS token.
*
* @param operationType The type of the operation.
* @param httpMethod The HTTP method of the operation.
* @param url The URL associated with the operation.
* @param requestHeaders The list of HTTP headers for the request.
* @param sasTokenForReuse The SAS token for reusing authentication.
* @return An AbfsRestOperation instance.
*/
AbfsRestOperation getAbfsRestOperation(final AbfsRestOperationType operationType,
final String httpMethod,
final URL url,
final List<AbfsHttpHeader> requestHeaders,
final String sasTokenForReuse) {
return new AbfsRestOperation(
operationType,
this,
httpMethod,
url,
requestHeaders, sasTokenForReuse);
}
}

View File

@ -82,6 +82,11 @@ public class AbfsRestOperation {
*/
private String failureReason;
/**
* This variable stores the tracing context used for last Rest Operation.
*/
private TracingContext lastUsedTracingContext;
/**
* Checks if there is non-null HTTP response.
* @return true if there is a non-null HTTP response from the ABFS call.
@ -197,10 +202,13 @@ String getSasToken() {
public void execute(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// Since this might be a sub-sequential or parallel rest operation
// triggered by a single file system call, using a new tracing context.
lastUsedTracingContext = createNewTracingContext(tracingContext);
try {
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
AbfsStatistic.getStatNameFromHttpCall(method),
() -> completeExecute(tracingContext));
() -> completeExecute(lastUsedTracingContext));
} catch (AzureBlobFileSystemException aze) {
throw aze;
} catch (IOException e) {
@ -214,7 +222,7 @@ public void execute(TracingContext tracingContext)
* HTTP operations.
* @param tracingContext TracingContext instance to track correlation IDs
*/
private void completeExecute(TracingContext tracingContext)
void completeExecute(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests
String latencyHeader = getClientLatency();
@ -409,4 +417,25 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
abfsCounters.incrementCounter(statistic, value);
}
}
/**
* Creates a new Tracing context before entering the retry loop of a rest operation.
* This will ensure all rest operations have unique
* tracing context that will be used for all the retries.
* @param tracingContext original tracingContext.
* @return tracingContext new tracingContext object created from original one.
*/
@VisibleForTesting
public TracingContext createNewTracingContext(final TracingContext tracingContext) {
return new TracingContext(tracingContext);
}
/**
* Returns the tracing contest used for last rest operation made.
* @return tracingContext lasUserTracingContext.
*/
@VisibleForTesting
public final TracingContext getLastTracingContext() {
return lastUsedTracingContext;
}
}

View File

@ -29,10 +29,14 @@
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
@ -61,7 +65,6 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test delete operation.
*/
@ -257,14 +260,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
// Case 2: Mimic retried case
// Idempotency check on Delete always returns success
AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
AbfsRestOperation idempotencyRetOp = Mockito.spy(ITestAbfsClient.getRestOp(
DeletePath, mockClient, HTTP_METHOD_DELETE,
ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
ITestAbfsClient.getTestRequestHeaders(mockClient));
ITestAbfsClient.getTestRequestHeaders(mockClient)));
idempotencyRetOp.hardSetResult(HTTP_OK);
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
TracingContext tracingContext = getTestTracingContext(fs, false);
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
.thenCallRealMethod();
@ -283,4 +287,25 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
mockStore.delete(new Path("/NonExistingPath"), false, getTestTracingContext(fs, false));
}
@Test
public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext()
throws Exception {
Configuration configuration = getRawConfiguration();
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
Mockito.doReturn(spiedStore).when(fs).getAbfsStore();
spiedStore.setClient(spiedClient);
fs.mkdirs(new Path("/testDir"));
fs.create(new Path("/testDir/file1"));
fs.create(new Path("/testDir/file2"));
AbfsClientTestUtil.hookOnRestOpsForTracingContextSingularity(spiedClient);
fs.delete(new Path("/testDir"), true);
fs.close();
}
}

View File

@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@ -28,6 +29,8 @@
import java.util.concurrent.Future;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -36,16 +39,30 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rename;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
/**
* Test listStatus operation.
@ -53,6 +70,7 @@
public class ITestAzureBlobFileSystemListStatus extends
AbstractAbfsIntegrationTest {
private static final int TEST_FILES_NUMBER = 6000;
private static final String TEST_CONTINUATION_TOKEN = "continuation";
public ITestAzureBlobFileSystemListStatus() throws Exception {
super();
@ -62,34 +80,105 @@ public ITestAzureBlobFileSystemListStatus() throws Exception {
public void testListPath() throws Exception {
Configuration config = new Configuration(this.getRawConfiguration());
config.set(AZURE_LIST_MAX_RESULTS, "5000");
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config);
final List<Future<Void>> tasks = new ArrayList<>();
try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config)) {
final List<Future<Void>> tasks = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
final Path fileName = new Path("/test" + i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
touch(fileName);
return null;
}
};
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
final Path fileName = new Path("/test" + i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
touch(fileName);
return null;
}
};
tasks.add(es.submit(callable));
tasks.add(es.submit(callable));
}
for (Future<Void> task : tasks) {
task.get();
}
es.shutdownNow();
fs.registerListener(
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
}
}
for (Future<Void> task : tasks) {
task.get();
}
/**
* Test to verify that each paginated call to ListBlobs uses a new tracing context.
* @throws Exception
*/
@Test
public void testListPathTracingContext() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final AzureBlobFileSystem spiedFs = Mockito.spy(fs);
final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
final TracingContext spiedTracingContext = Mockito.spy(
new TracingContext(
fs.getClientCorrelationId(), fs.getFileSystemId(),
FSOperationType.LISTSTATUS, true, TracingHeaderFormat.ALL_ID_FORMAT, null));
es.shutdownNow();
fs.registerListener(
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
spiedStore.setClient(spiedClient);
spiedFs.setWorkingDirectory(new Path("/"));
AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient,
(httpOperation) -> {
ListResultEntrySchema entry = new ListResultEntrySchema()
.withName("a")
.withIsDirectory(true);
List<ListResultEntrySchema> paths = new ArrayList<>();
paths.add(entry);
paths.clear();
entry = new ListResultEntrySchema()
.withName("abc.txt")
.withIsDirectory(false);
paths.add(entry);
ListResultSchema schema1 = new ListResultSchema().withPaths(paths);
ListResultSchema schema2 = new ListResultSchema().withPaths(paths);
when(httpOperation.getListResultSchema()).thenReturn(schema1)
.thenReturn(schema2);
when(httpOperation.getResponseHeader(
HttpHeaderConfigurations.X_MS_CONTINUATION))
.thenReturn(TEST_CONTINUATION_TOKEN)
.thenReturn(EMPTY_STRING);
Stubber stubber = Mockito.doThrow(
new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
stubber.doNothing().when(httpOperation).processResponse(
nullable(byte[].class), nullable(int.class), nullable(int.class));
when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_OK);
return httpOperation;
});
List<FileStatus> fileStatuses = new ArrayList<>();
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);
// Assert that there were 2 paginated ListPath calls were made 1 and 2.
// 1. Without continuation token
Mockito.verify(spiedClient, times(1)).listPath(
"/", false,
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
null, spiedTracingContext);
// 2. With continuation token
Mockito.verify(spiedClient, times(1)).listPath(
"/", false,
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
TEST_CONTINUATION_TOKEN, spiedTracingContext);
// Assert that none of the API calls used the same tracing header.
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any());
}
/**

View File

@ -83,44 +83,46 @@ public void checkCorrelationConfigValidation(String clientCorrelationId,
boolean includeInHeader) throws Exception {
Configuration conf = getRawConfiguration();
conf.set(FS_AZURE_CLIENT_CORRELATIONID, clientCorrelationId);
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(conf);
try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(conf)) {
String correlationID = fs.getClientCorrelationId();
if (includeInHeader) {
Assertions.assertThat(correlationID)
.describedAs("Correlation ID should match config when valid")
.isEqualTo(clientCorrelationId);
} else {
Assertions.assertThat(correlationID)
.describedAs("Invalid ID should be replaced with empty string")
.isEqualTo(EMPTY_STRING);
}
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fs.getFileSystemId(), FSOperationType.TEST_OP,
TracingHeaderFormat.ALL_ID_FORMAT, null);
boolean isNamespaceEnabled = fs.getIsNamespaceEnabled(tracingContext);
String path = getRelativePath(new Path("/testDir"));
String permission = isNamespaceEnabled
? getOctalNotation(FsPermission.getDirDefault())
: null;
String umask = isNamespaceEnabled
? getOctalNotation(FsPermission.getUMask(fs.getConf()))
: null;
//request should not fail for invalid clientCorrelationID
AbfsRestOperation op = fs.getAbfsClient()
.createPath(path, false, true, permission, umask, false, null,
tracingContext);
int statusCode = op.getResult().getStatusCode();
Assertions.assertThat(statusCode).describedAs("Request should not fail")
.isEqualTo(HTTP_CREATED);
String requestHeader = op.getResult().getClientRequestId().replace("[", "")
.replace("]", "");
Assertions.assertThat(requestHeader)
.describedAs("Client Request Header should match TracingContext")
.isEqualTo(op.getLastTracingContext().getHeader());
String correlationID = fs.getClientCorrelationId();
if (includeInHeader) {
Assertions.assertThat(correlationID)
.describedAs("Correlation ID should match config when valid")
.isEqualTo(clientCorrelationId);
} else {
Assertions.assertThat(correlationID)
.describedAs("Invalid ID should be replaced with empty string")
.isEqualTo(EMPTY_STRING);
}
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fs.getFileSystemId(), FSOperationType.TEST_OP,
TracingHeaderFormat.ALL_ID_FORMAT, null);
boolean isNamespaceEnabled = fs.getIsNamespaceEnabled(tracingContext);
String path = getRelativePath(new Path("/testDir"));
String permission = isNamespaceEnabled
? getOctalNotation(FsPermission.getDirDefault())
: null;
String umask = isNamespaceEnabled
? getOctalNotation(FsPermission.getUMask(fs.getConf()))
: null;
//request should not fail for invalid clientCorrelationID
AbfsRestOperation op = fs.getAbfsClient()
.createPath(path, false, true, permission, umask, false, null,
tracingContext);
int statusCode = op.getResult().getStatusCode();
Assertions.assertThat(statusCode).describedAs("Request should not fail")
.isEqualTo(HTTP_CREATED);
String requestHeader = op.getResult().getClientRequestId().replace("[", "")
.replace("]", "");
Assertions.assertThat(requestHeader)
.describedAs("Client Request Header should match TracingContext")
.isEqualTo(tracingContext.getHeader());
}
@Ignore

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.assertj.core.api.Assertions;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.util.functional.FunctionRaisingIOE;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
/**
* Utility class to help defining mock behavior on AbfsClient and AbfsRestOperation
* objects which are protected inside services package.
*/
public final class AbfsClientTestUtil {
private AbfsClientTestUtil() {
}
public static void setMockAbfsRestOperationForListPathOperation(
final AbfsClient spiedClient,
FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> functionRaisingIOE)
throws Exception {
ExponentialRetryPolicy retryPolicy = Mockito.mock(ExponentialRetryPolicy.class);
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
AbfsRestOperationType.ListPaths,
spiedClient,
HTTP_METHOD_GET,
null,
new ArrayList<>()
));
Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation(
eq(AbfsRestOperationType.ListPaths), any(), any(), any());
addGeneralMockBehaviourToAbfsClient(spiedClient, retryPolicy);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
functionRaisingIOE.apply(httpOperation);
}
/**
* Adding general mock behaviour to AbfsRestOperation and AbfsHttpOperation
* to avoid any NPE occurring. These will avoid any network call made and
* will return the relevant exception or return value directly.
* @param abfsRestOperation to be mocked
* @param httpOperation to be mocked
* @throws IOException
*/
public static void addGeneralMockBehaviourToRestOpAndHttpOp(final AbfsRestOperation abfsRestOperation,
final AbfsHttpOperation httpOperation) throws IOException {
HttpURLConnection httpURLConnection = Mockito.mock(HttpURLConnection.class);
Mockito.doNothing().when(httpURLConnection)
.setRequestProperty(nullable(String.class), nullable(String.class));
Mockito.doReturn(httpURLConnection).when(httpOperation).getConnection();
Mockito.doReturn("").when(abfsRestOperation).getClientLatency();
Mockito.doReturn(httpOperation).when(abfsRestOperation).createHttpOperation();
}
/**
* Adding general mock behaviour to AbfsClient to avoid any NPE occurring.
* These will avoid any network call made and will return the relevant exception or return value directly.
* @param abfsClient to be mocked
* @param retryPolicy to be mocked
* @throws IOException
*/
public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient,
final ExponentialRetryPolicy retryPolicy) throws IOException {
Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
Mockito.doReturn("").when(abfsClient).getAccessToken();
AbfsThrottlingIntercept intercept = Mockito.mock(
AbfsThrottlingIntercept.class);
Mockito.doReturn(intercept).when(abfsClient).getIntercept();
Mockito.doNothing()
.when(intercept)
.sendingRequest(any(), nullable(AbfsCounters.class));
Mockito.doNothing().when(intercept).updateMetrics(any(), any());
Mockito.doReturn(retryPolicy).when(abfsClient).getRetryPolicy();
Mockito.doReturn(true)
.when(retryPolicy)
.shouldRetry(nullable(Integer.class), nullable(Integer.class));
Mockito.doReturn(false).when(retryPolicy).shouldRetry(0, HTTP_OK);
Mockito.doReturn(false).when(retryPolicy).shouldRetry(1, HTTP_OK);
Mockito.doReturn(false).when(retryPolicy).shouldRetry(2, HTTP_OK);
}
public static void hookOnRestOpsForTracingContextSingularity(AbfsClient client) {
Set<TracingContext> tracingContextSet = new HashSet<>();
ReentrantLock lock = new ReentrantLock();
Answer answer = new Answer() {
@Override
public Object answer(final InvocationOnMock invocationOnMock)
throws Throwable {
AbfsRestOperation op = Mockito.spy((AbfsRestOperation) invocationOnMock.callRealMethod());
Mockito.doAnswer(completeExecuteInvocation -> {
lock.lock();
try {
TracingContext context = completeExecuteInvocation.getArgument(0);
Assertions.assertThat(tracingContextSet).doesNotContain(context);
tracingContextSet.add(context);
} finally {
lock.unlock();
}
return completeExecuteInvocation.callRealMethod();
}).when(op).completeExecute(Mockito.any(TracingContext.class));
return op;
}
};
Mockito.doAnswer(answer)
.when(client)
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList(),
Mockito.nullable(byte[].class), Mockito.anyInt(), Mockito.anyInt(),
Mockito.nullable(String.class));
Mockito.doAnswer(answer)
.when(client)
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList());
Mockito.doAnswer(answer)
.when(client)
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList(),
Mockito.nullable(String.class));
}
}

View File

@ -58,6 +58,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -592,7 +593,7 @@ public void testExpectHundredContinue() throws Exception {
// Mock the restOperation for the client.
Mockito.doReturn(op)
.when(testClient)
.getAbfsRestOperationForAppend(Mockito.any(),
.getAbfsRestOperation(eq(AbfsRestOperationType.Append),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.nullable(int.class), Mockito.nullable(int.class),
Mockito.any());

View File

@ -290,6 +290,7 @@ public void testExpectHundredContinue() throws Exception {
TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
"abcde", FSOperationType.APPEND,
TracingHeaderFormat.ALL_ID_FORMAT, null));
Mockito.doReturn(tracingContext).when(op).createNewTracingContext(Mockito.any());
switch (errorType) {
case WRITE:

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
@ -39,7 +37,8 @@
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.EGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INGRESS_OVER_ACCOUNT_LIMIT;
import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToAbfsClient;
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.addGeneralMockBehaviourToRestOpAndHttpOp;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_ABBREVIATION;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_RESET_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION;
@ -166,7 +165,7 @@ private void testClientRequestIdForStatusRetry(int status,
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy retryPolicy = Mockito.mock(
ExponentialRetryPolicy.class);
addMockBehaviourToAbfsClient(abfsClient, retryPolicy);
addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
@ -178,7 +177,7 @@ private void testClientRequestIdForStatusRetry(int status,
));
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
Mockito.doNothing()
.doNothing()
@ -202,6 +201,8 @@ private void testClientRequestIdForStatusRetry(int status,
TracingContext tracingContext = Mockito.mock(TracingContext.class);
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
Mockito.doReturn(tracingContext)
.when(abfsRestOperation).createNewTracingContext(any());
int[] count = new int[1];
count[0] = 0;
@ -225,7 +226,7 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
AbfsClient abfsClient = Mockito.mock(AbfsClient.class);
ExponentialRetryPolicy retryPolicy = Mockito.mock(
ExponentialRetryPolicy.class);
addMockBehaviourToAbfsClient(abfsClient, retryPolicy);
addGeneralMockBehaviourToAbfsClient(abfsClient, retryPolicy);
AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
@ -237,7 +238,7 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
));
AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class);
addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation);
Stubber stubber = Mockito.doThrow(exceptions[0]);
for (int iteration = 1; iteration < len; iteration++) {
@ -253,6 +254,7 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
TracingContext tracingContext = Mockito.mock(TracingContext.class);
Mockito.doNothing().when(tracingContext).setRetryCount(nullable(int.class));
Mockito.doReturn(tracingContext).when(abfsRestOperation).createNewTracingContext(any());
int[] count = new int[1];
count[0] = 0;
@ -268,35 +270,4 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions,
abfsRestOperation.execute(tracingContext);
Assertions.assertThat(count[0]).isEqualTo(len + 1);
}
private void addMockBehaviourToRestOpAndHttpOp(final AbfsRestOperation abfsRestOperation,
final AbfsHttpOperation httpOperation) throws IOException {
HttpURLConnection httpURLConnection = Mockito.mock(HttpURLConnection.class);
Mockito.doNothing()
.when(httpURLConnection)
.setRequestProperty(nullable(String.class), nullable(String.class));
Mockito.doReturn(httpURLConnection).when(httpOperation).getConnection();
Mockito.doReturn("").when(abfsRestOperation).getClientLatency();
Mockito.doReturn(httpOperation).when(abfsRestOperation).createHttpOperation();
}
private void addMockBehaviourToAbfsClient(final AbfsClient abfsClient,
final ExponentialRetryPolicy retryPolicy) throws IOException {
Mockito.doReturn(OAuth).when(abfsClient).getAuthType();
Mockito.doReturn("").when(abfsClient).getAccessToken();
AbfsThrottlingIntercept intercept = Mockito.mock(
AbfsThrottlingIntercept.class);
Mockito.doReturn(intercept).when(abfsClient).getIntercept();
Mockito.doNothing()
.when(intercept)
.sendingRequest(any(), nullable(AbfsCounters.class));
Mockito.doNothing().when(intercept).updateMetrics(any(), any());
Mockito.doReturn(retryPolicy).when(abfsClient).getRetryPolicy();
Mockito.doReturn(true)
.when(retryPolicy)
.shouldRetry(nullable(Integer.class), nullable(Integer.class));
Mockito.doReturn(false).when(retryPolicy).shouldRetry(1, HTTP_OK);
Mockito.doReturn(false).when(retryPolicy).shouldRetry(2, HTTP_OK);
}
}