Hadoop 17132. ABFS: Fix Rename and Delete Idempotency check trigger
- Contributed by Sneha Vijayarajan
This commit is contained in:
parent
f24e2ec487
commit
18ca80331c
@ -336,10 +336,19 @@ public class AbfsClient implements Closeable {
|
||||
url,
|
||||
requestHeaders);
|
||||
Instant renameRequestStartTime = Instant.now();
|
||||
op.execute();
|
||||
|
||||
if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||
return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
|
||||
try {
|
||||
op.execute();
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
|
||||
renameRequestStartTime, op, destination);
|
||||
if (idempotencyOp.getResult().getStatusCode()
|
||||
== op.getResult().getStatusCode()) {
|
||||
// idempotency did not return different result
|
||||
// throw back the exception
|
||||
throw e;
|
||||
} else {
|
||||
return idempotencyOp;
|
||||
}
|
||||
}
|
||||
|
||||
return op;
|
||||
@ -369,14 +378,21 @@ public class AbfsClient implements Closeable {
|
||||
// exists. Check on destination status and if it has a recent LMT timestamp.
|
||||
// If yes, return success, else fall back to original rename request failure response.
|
||||
|
||||
final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
|
||||
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
|
||||
String lmt = destStatusOp.getResult().getResponseHeader(
|
||||
HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
try {
|
||||
final AbfsRestOperation destStatusOp = getPathStatus(destination,
|
||||
false);
|
||||
if (destStatusOp.getResult().getStatusCode()
|
||||
== HttpURLConnection.HTTP_OK) {
|
||||
String lmt = destStatusOp.getResult().getResponseHeader(
|
||||
HttpHeaderConfigurations.LAST_MODIFIED);
|
||||
|
||||
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
||||
return destStatusOp;
|
||||
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
||||
return destStatusOp;
|
||||
}
|
||||
}
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
// GetFileStatus on the destination failed, return original op
|
||||
return op;
|
||||
}
|
||||
}
|
||||
|
||||
@ -570,10 +586,18 @@ public class AbfsClient implements Closeable {
|
||||
HTTP_METHOD_DELETE,
|
||||
url,
|
||||
requestHeaders);
|
||||
try {
|
||||
op.execute();
|
||||
|
||||
if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||
return deleteIdempotencyCheckOp(op);
|
||||
} catch (AzureBlobFileSystemException e) {
|
||||
final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
|
||||
if (idempotencyOp.getResult().getStatusCode()
|
||||
== op.getResult().getStatusCode()) {
|
||||
// idempotency did not return different result
|
||||
// throw back the exception
|
||||
throw e;
|
||||
} else {
|
||||
return idempotencyOp;
|
||||
}
|
||||
}
|
||||
|
||||
return op;
|
||||
@ -822,7 +846,8 @@ public class AbfsClient implements Closeable {
|
||||
return createRequestUrl(EMPTY_STRING, query);
|
||||
}
|
||||
|
||||
private URL createRequestUrl(final String path, final String query)
|
||||
@VisibleForTesting
|
||||
protected URL createRequestUrl(final String path, final String query)
|
||||
throws AzureBlobFileSystemException {
|
||||
final String base = baseUrl.toString();
|
||||
String encodedPath = path;
|
||||
|
@ -24,6 +24,7 @@ import java.net.URL;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -170,7 +171,8 @@ public class AbfsRestOperation {
|
||||
* Executes the REST operation with retry, by issuing one or more
|
||||
* HTTP operations.
|
||||
*/
|
||||
void execute() throws AzureBlobFileSystemException {
|
||||
@VisibleForTesting
|
||||
public void execute() throws AzureBlobFileSystemException {
|
||||
// see if we have latency reports from the previous requests
|
||||
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
|
||||
if (latencyHeader != null && !latencyHeader.isEmpty()) {
|
||||
@ -181,8 +183,9 @@ public class AbfsRestOperation {
|
||||
|
||||
retryCount = 0;
|
||||
LOG.debug("First execution of REST operation - {}", operationType);
|
||||
while (!executeHttpOperation(retryCount++)) {
|
||||
while (!executeHttpOperation(retryCount)) {
|
||||
try {
|
||||
++retryCount;
|
||||
LOG.debug("Retrying REST operation {}. RetryCount = {}",
|
||||
operationType, retryCount);
|
||||
Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount));
|
||||
|
@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
@ -38,9 +39,12 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ -168,7 +172,8 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||
// Set retryCount to non-zero
|
||||
when(op.isARetriedRequest()).thenReturn(true);
|
||||
|
||||
// Mock instance of Http Operation response. This will return HTTP:Not Found
|
||||
// Case 1: Mock instance of Http Operation response. This will return
|
||||
// HTTP:Not Found
|
||||
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
||||
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
||||
|
||||
@ -181,6 +186,64 @@ public class ITestAzureBlobFileSystemDelete extends
|
||||
.describedAs(
|
||||
"Delete is considered idempotent by default and should return success.")
|
||||
.isEqualTo(HTTP_OK);
|
||||
|
||||
// Case 2: Mock instance of Http Operation response. This will return
|
||||
// HTTP:Bad Request
|
||||
AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
|
||||
when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);
|
||||
|
||||
// Mock delete response to 400
|
||||
when(op.getResult()).thenReturn(http400Op);
|
||||
|
||||
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
||||
.getResult()
|
||||
.getStatusCode())
|
||||
.describedAs(
|
||||
"Idempotency check to happen only for HTTP 404 response.")
|
||||
.isEqualTo(HTTP_BAD_REQUEST);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient client = TestAbfsClient.createTestClientFromCurrentContext(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
|
||||
// Case 1: Not a retried case should throw error back
|
||||
intercept(AbfsRestOperationException.class,
|
||||
() -> client.deletePath(
|
||||
"/NonExistingPath",
|
||||
false,
|
||||
null));
|
||||
|
||||
// mock idempotency check to mimic retried case
|
||||
AbfsClient mockClient = TestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
|
||||
// Case 2: Mimic retried case
|
||||
// Idempotency check on Delete always returns success
|
||||
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
|
||||
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
|
||||
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
|
||||
when(idempotencyRetOp.getResult()).thenReturn(http200Op);
|
||||
|
||||
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
||||
when(mockClient.deletePath("/NonExistingPath", false,
|
||||
null)).thenCallRealMethod();
|
||||
|
||||
Assertions.assertThat(mockClient.deletePath(
|
||||
"/NonExistingPath",
|
||||
false,
|
||||
null)
|
||||
.getResult()
|
||||
.getStatusCode())
|
||||
.describedAs("Idempotency check reports successful "
|
||||
+ "delete. 200OK should be returned")
|
||||
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
@ -42,6 +43,8 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static java.util.UUID.randomUUID;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@ -51,6 +54,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotE
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Test rename operation.
|
||||
*/
|
||||
@ -77,6 +82,16 @@ public class ITestAzureBlobFileSystemRename extends
|
||||
assertPathDoesNotExist(fs, "expected renamed", src);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameWithPreExistingDestination() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
Path src = path("renameSrc");
|
||||
touch(src);
|
||||
Path dest = path("renameDest");
|
||||
touch(dest);
|
||||
assertRenameOutcome(fs, src, dest, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameFileUnderDir() throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
@ -197,6 +212,59 @@ public class ITestAzureBlobFileSystemRename extends
|
||||
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRenameIdempotencyTriggerHttpNotFound() throws Exception {
|
||||
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
||||
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
||||
|
||||
AbfsHttpOperation http200Op = mock(AbfsHttpOperation.class);
|
||||
when(http200Op.getStatusCode()).thenReturn(HTTP_OK);
|
||||
|
||||
// Check 1 where idempotency check fails to find dest path
|
||||
// Rename should throw exception
|
||||
testRenameIdempotencyTriggerChecks(http404Op);
|
||||
|
||||
// Check 2 where idempotency check finds the dest path
|
||||
// Renam will be successful
|
||||
testRenameIdempotencyTriggerChecks(http200Op);
|
||||
}
|
||||
|
||||
private void testRenameIdempotencyTriggerChecks(
|
||||
AbfsHttpOperation idempotencyRetHttpOp) throws Exception {
|
||||
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
AbfsClient client = TestAbfsClient.getMockAbfsClient(
|
||||
fs.getAbfsStore().getClient(),
|
||||
this.getConfiguration());
|
||||
|
||||
AbfsRestOperation idempotencyRetOp = mock(AbfsRestOperation.class);
|
||||
when(idempotencyRetOp.getResult()).thenReturn(idempotencyRetHttpOp);
|
||||
doReturn(idempotencyRetOp).when(client).renameIdempotencyCheckOp(any(),
|
||||
any(), any());
|
||||
when(client.renamePath(any(), any(), any())).thenCallRealMethod();
|
||||
|
||||
// rename on non-existing source file will trigger idempotency check
|
||||
if (idempotencyRetHttpOp.getStatusCode() == HTTP_OK) {
|
||||
// idempotency check found that destination exists and is recently created
|
||||
Assertions.assertThat(client.renamePath(
|
||||
"/NonExistingsourcepath",
|
||||
"/destpath",
|
||||
null)
|
||||
.getResult()
|
||||
.getStatusCode())
|
||||
.describedAs("Idempotency check reports recent successful "
|
||||
+ "rename. 200OK should be returned")
|
||||
.isEqualTo(idempotencyRetOp.getResult().getStatusCode());
|
||||
} else {
|
||||
// rename dest not found. Original exception should be returned.
|
||||
intercept(AbfsRestOperationException.class,
|
||||
() -> client.renamePath(
|
||||
"/NonExistingsourcepath",
|
||||
"/destpath",
|
||||
""));
|
||||
}
|
||||
}
|
||||
|
||||
private void testRenameTimeout(
|
||||
int renameRequestStatus,
|
||||
int renameIdempotencyCheckStatus,
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.regex.Pattern;
|
||||
@ -33,6 +34,9 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
|
||||
@ -271,4 +275,76 @@ public final class TestAbfsClient {
|
||||
|
||||
return testClient;
|
||||
}
|
||||
|
||||
public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
AbfsConfiguration abfsConfig)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
AuthType currentAuthType = abfsConfig.getAuthType(
|
||||
abfsConfig.getAccountName());
|
||||
|
||||
org.junit.Assume.assumeTrue(
|
||||
(currentAuthType == AuthType.SharedKey)
|
||||
|| (currentAuthType == AuthType.OAuth));
|
||||
|
||||
AbfsClient client = mock(AbfsClient.class);
|
||||
AbfsPerfTracker tracker = new AbfsPerfTracker(
|
||||
"test",
|
||||
abfsConfig.getAccountName(),
|
||||
abfsConfig);
|
||||
|
||||
when(client.getAbfsPerfTracker()).thenReturn(tracker);
|
||||
when(client.getAuthType()).thenReturn(currentAuthType);
|
||||
when(client.getRetryPolicy()).thenReturn(
|
||||
new ExponentialRetryPolicy(1));
|
||||
|
||||
when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
|
||||
when(client.createRequestUrl(any(), any())).thenCallRealMethod();
|
||||
when(client.getAccessToken()).thenCallRealMethod();
|
||||
when(client.getSharedKeyCredentials()).thenCallRealMethod();
|
||||
when(client.createDefaultHeaders()).thenCallRealMethod();
|
||||
|
||||
// override baseurl
|
||||
Field baseUrlField = AbfsClient.class.getDeclaredField("baseUrl");
|
||||
baseUrlField.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(baseUrlField, baseUrlField.getModifiers() & ~java.lang.reflect.Modifier.FINAL);
|
||||
baseUrlField.set(client, baseAbfsClientInstance.getBaseUrl());
|
||||
|
||||
// override auth provider
|
||||
if (currentAuthType == AuthType.SharedKey) {
|
||||
Field sharedKeyCredsField = AbfsClient.class.getDeclaredField(
|
||||
"sharedKeyCredentials");
|
||||
sharedKeyCredsField.setAccessible(true);
|
||||
modifiersField.setInt(sharedKeyCredsField,
|
||||
sharedKeyCredsField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
sharedKeyCredsField.set(client, new SharedKeyCredentials(
|
||||
abfsConfig.getAccountName().substring(0,
|
||||
abfsConfig.getAccountName().indexOf(DOT)),
|
||||
abfsConfig.getStorageAccountKey()));
|
||||
} else {
|
||||
Field tokenProviderField = AbfsClient.class.getDeclaredField(
|
||||
"tokenProvider");
|
||||
tokenProviderField.setAccessible(true);
|
||||
modifiersField.setInt(tokenProviderField,
|
||||
tokenProviderField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
tokenProviderField.set(client, abfsConfig.getTokenProvider());
|
||||
}
|
||||
|
||||
// override user agent
|
||||
String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild "
|
||||
+ "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; "
|
||||
+ "UNKNOWN/UNKNOWN) MSFT";
|
||||
Field userAgentField = AbfsClient.class.getDeclaredField(
|
||||
"userAgent");
|
||||
userAgentField.setAccessible(true);
|
||||
modifiersField.setInt(userAgentField,
|
||||
userAgentField.getModifiers()
|
||||
& ~java.lang.reflect.Modifier.FINAL);
|
||||
userAgentField.set(client, userAgent);
|
||||
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user