Hadoop-17015. ABFS: Handling Rename and Delete idempotency
Contributed by Sneha Vijayarajan.
This commit is contained in:
parent
d4e36409d4
commit
8f78aeb250
@ -764,6 +764,11 @@ public void setMaxIoRetries(int maxIoRetries) {
|
|||||||
this.maxIoRetries = maxIoRetries;
|
this.maxIoRetries = maxIoRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setMaxBackoffIntervalMilliseconds(int maxBackoffInterval) {
|
||||||
|
this.maxBackoffInterval = maxBackoffInterval;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
|
void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
|
||||||
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
|
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
|
||||||
|
@ -34,7 +34,6 @@
|
|||||||
import java.nio.charset.CharsetDecoder;
|
import java.nio.charset.CharsetDecoder;
|
||||||
import java.nio.charset.CharsetEncoder;
|
import java.nio.charset.CharsetEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.text.ParseException;
|
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -96,6 +95,7 @@
|
|||||||
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
|
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
import org.apache.hadoop.fs.azurebfs.utils.Base64;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
|
import org.apache.hadoop.fs.azurebfs.utils.CRC64;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
|
||||||
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
|
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
@ -128,7 +128,6 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||||||
private URI uri;
|
private URI uri;
|
||||||
private String userName;
|
private String userName;
|
||||||
private String primaryUserGroup;
|
private String primaryUserGroup;
|
||||||
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
|
|
||||||
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
|
private static final String TOKEN_DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSSSSSS'Z'";
|
||||||
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
|
||||||
private static final int GET_SET_AGGREGATE_COUNT = 2;
|
private static final int GET_SET_AGGREGATE_COUNT = 2;
|
||||||
@ -672,7 +671,7 @@ public FileStatus getFileStatus(final Path path) throws IOException {
|
|||||||
resourceIsDir,
|
resourceIsDir,
|
||||||
1,
|
1,
|
||||||
blockSize,
|
blockSize,
|
||||||
parseLastModifiedTime(lastModified),
|
DateTimeUtils.parseLastModifiedTime(lastModified),
|
||||||
path,
|
path,
|
||||||
eTag);
|
eTag);
|
||||||
}
|
}
|
||||||
@ -748,7 +747,8 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I
|
|||||||
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
|
long contentLength = entry.contentLength() == null ? 0 : entry.contentLength();
|
||||||
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
|
boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory();
|
||||||
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
|
if (entry.lastModified() != null && !entry.lastModified().isEmpty()) {
|
||||||
lastModifiedMillis = parseLastModifiedTime(entry.lastModified());
|
lastModifiedMillis = DateTimeUtils.parseLastModifiedTime(
|
||||||
|
entry.lastModified());
|
||||||
}
|
}
|
||||||
|
|
||||||
Path entryPath = new Path(File.separator + entry.name());
|
Path entryPath = new Path(File.separator + entry.name());
|
||||||
@ -1235,18 +1235,6 @@ private boolean parseIsDirectory(final String resourceType) {
|
|||||||
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
|
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long parseLastModifiedTime(final String lastModifiedTime) {
|
|
||||||
long parsedTime = 0;
|
|
||||||
try {
|
|
||||||
Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN, Locale.US).parse(lastModifiedTime);
|
|
||||||
parsedTime = utcDate.getTime();
|
|
||||||
} catch (ParseException e) {
|
|
||||||
LOG.error("Failed to parse the date {}", lastModifiedTime);
|
|
||||||
} finally {
|
|
||||||
return parsedTime;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
|
private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws
|
||||||
CharacterCodingException {
|
CharacterCodingException {
|
||||||
StringBuilder commaSeparatedProperties = new StringBuilder();
|
StringBuilder commaSeparatedProperties = new StringBuilder();
|
||||||
|
@ -81,5 +81,8 @@ public final class FileSystemConfigurations {
|
|||||||
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
|
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
|
||||||
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
|
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_DELETE_CONSIDERED_IDEMPOTENT = true;
|
||||||
|
public static final int DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS = 5 * 60 * 1000; // 5 mins
|
||||||
|
|
||||||
private FileSystemConfigurations() {}
|
private FileSystemConfigurations() {}
|
||||||
}
|
}
|
||||||
|
@ -21,9 +21,11 @@
|
|||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
@ -44,9 +46,11 @@
|
|||||||
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
|
||||||
@ -320,7 +324,51 @@ public AbfsRestOperation renamePath(String source, final String destination, fin
|
|||||||
HTTP_METHOD_PUT,
|
HTTP_METHOD_PUT,
|
||||||
url,
|
url,
|
||||||
requestHeaders);
|
requestHeaders);
|
||||||
|
Instant renameRequestStartTime = Instant.now();
|
||||||
op.execute();
|
op.execute();
|
||||||
|
|
||||||
|
if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||||
|
return renameIdempotencyCheckOp(renameRequestStartTime, op, destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
return op;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the rename request failure is post a retry and if earlier rename
|
||||||
|
* request might have succeeded at back-end.
|
||||||
|
*
|
||||||
|
* If there is a parallel rename activity happening from any other store
|
||||||
|
* interface, the logic here will detect the rename to have happened due to
|
||||||
|
* the one initiated from this ABFS filesytem instance as it was retried. This
|
||||||
|
* should be a corner case hence going ahead with LMT check.
|
||||||
|
* @param renameRequestStartTime startTime for the rename request
|
||||||
|
* @param op Rename request REST operation response
|
||||||
|
* @param destination rename destination path
|
||||||
|
* @return REST operation response post idempotency check
|
||||||
|
* @throws AzureBlobFileSystemException if GetFileStatus hits any exception
|
||||||
|
*/
|
||||||
|
public AbfsRestOperation renameIdempotencyCheckOp(
|
||||||
|
final Instant renameRequestStartTime,
|
||||||
|
final AbfsRestOperation op,
|
||||||
|
final String destination) throws AzureBlobFileSystemException {
|
||||||
|
if ((op.isARetriedRequest())
|
||||||
|
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
|
||||||
|
// Server has returned HTTP 404, which means rename source no longer
|
||||||
|
// exists. Check on destination status and if it has a recent LMT timestamp.
|
||||||
|
// If yes, return success, else fall back to original rename request failure response.
|
||||||
|
|
||||||
|
final AbfsRestOperation destStatusOp = getPathStatus(destination, false);
|
||||||
|
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
|
||||||
|
String lmt = destStatusOp.getResult().getResponseHeader(
|
||||||
|
HttpHeaderConfigurations.LAST_MODIFIED);
|
||||||
|
|
||||||
|
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
|
||||||
|
return destStatusOp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return op;
|
return op;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -476,6 +524,45 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive,
|
|||||||
url,
|
url,
|
||||||
requestHeaders);
|
requestHeaders);
|
||||||
op.execute();
|
op.execute();
|
||||||
|
|
||||||
|
if (op.getResult().getStatusCode() != HttpURLConnection.HTTP_OK) {
|
||||||
|
return deleteIdempotencyCheckOp(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
return op;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the delete request failure is post a retry and if delete failure
|
||||||
|
* qualifies to be a success response assuming idempotency.
|
||||||
|
*
|
||||||
|
* There are below scenarios where delete could be incorrectly deducted as
|
||||||
|
* success post request retry:
|
||||||
|
* 1. Target was originally not existing and initial delete request had to be
|
||||||
|
* re-tried.
|
||||||
|
* 2. Parallel delete issued from any other store interface rather than
|
||||||
|
* delete issued from this filesystem instance.
|
||||||
|
* These are few corner cases and usually returning a success at this stage
|
||||||
|
* should help the job to continue.
|
||||||
|
* @param op Delete request REST operation response
|
||||||
|
* @return REST operation response post idempotency check
|
||||||
|
*/
|
||||||
|
public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) {
|
||||||
|
if ((op.isARetriedRequest())
|
||||||
|
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
|
||||||
|
&& DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
|
||||||
|
// Server has returned HTTP 404, which means path no longer
|
||||||
|
// exists. Assuming delete result to be idempotent, return success.
|
||||||
|
final AbfsRestOperation successOp = new AbfsRestOperation(
|
||||||
|
AbfsRestOperationType.DeletePath,
|
||||||
|
this,
|
||||||
|
HTTP_METHOD_DELETE,
|
||||||
|
op.getUrl(),
|
||||||
|
op.getRequestHeaders());
|
||||||
|
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
|
||||||
|
return successOp;
|
||||||
|
}
|
||||||
|
|
||||||
return op;
|
return op;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +81,19 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
|
|||||||
private long sendRequestTimeMs;
|
private long sendRequestTimeMs;
|
||||||
private long recvResponseTimeMs;
|
private long recvResponseTimeMs;
|
||||||
|
|
||||||
|
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(final URL url,
|
||||||
|
final String method, final int httpStatus) {
|
||||||
|
return new AbfsHttpOperation(url, method, httpStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
private AbfsHttpOperation(final URL url, final String method,
|
||||||
|
final int httpStatus) {
|
||||||
|
this.isTraceEnabled = LOG.isTraceEnabled();
|
||||||
|
this.url = url;
|
||||||
|
this.method = method;
|
||||||
|
this.statusCode = httpStatus;
|
||||||
|
}
|
||||||
|
|
||||||
protected HttpURLConnection getConnection() {
|
protected HttpURLConnection getConnection() {
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,7 @@ public class AbfsRestOperation {
|
|||||||
private byte[] buffer;
|
private byte[] buffer;
|
||||||
private int bufferOffset;
|
private int bufferOffset;
|
||||||
private int bufferLength;
|
private int bufferLength;
|
||||||
|
private int retryCount = 0;
|
||||||
|
|
||||||
private AbfsHttpOperation result;
|
private AbfsHttpOperation result;
|
||||||
|
|
||||||
@ -70,6 +71,23 @@ public AbfsHttpOperation getResult() {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void hardSetResult(int httpStatus) {
|
||||||
|
result = AbfsHttpOperation.getAbfsHttpOperationWithFixedResult(this.url,
|
||||||
|
this.method, httpStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
public URL getUrl() {
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AbfsHttpHeader> getRequestHeaders() {
|
||||||
|
return requestHeaders;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isARetriedRequest() {
|
||||||
|
return (retryCount > 0);
|
||||||
|
}
|
||||||
|
|
||||||
String getSasToken() {
|
String getSasToken() {
|
||||||
return sasToken;
|
return sasToken;
|
||||||
}
|
}
|
||||||
@ -157,7 +175,7 @@ void execute() throws AzureBlobFileSystemException {
|
|||||||
requestHeaders.add(httpHeader);
|
requestHeaders.add(httpHeader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int retryCount = 0;
|
retryCount = 0;
|
||||||
LOG.debug("First execution of REST operation - {}", operationType);
|
LOG.debug("First execution of REST operation - {}", operationType);
|
||||||
while (!executeHttpOperation(retryCount++)) {
|
while (!executeHttpOperation(retryCount++)) {
|
||||||
try {
|
try {
|
||||||
|
@ -0,0 +1,71 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azurebfs.utils;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Locale;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;
|
||||||
|
|
||||||
|
public final class DateTimeUtils {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(DateTimeUtils.class);
|
||||||
|
private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss z";
|
||||||
|
|
||||||
|
public static long parseLastModifiedTime(final String lastModifiedTime) {
|
||||||
|
long parsedTime = 0;
|
||||||
|
try {
|
||||||
|
Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN, Locale.US)
|
||||||
|
.parse(lastModifiedTime);
|
||||||
|
parsedTime = utcDate.getTime();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
LOG.error("Failed to parse the date {}", lastModifiedTime);
|
||||||
|
} finally {
|
||||||
|
return parsedTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to identify if an operation was recently executed based on the LMT of
|
||||||
|
* a file or folder. LMT needs to be more recent that the original request
|
||||||
|
* start time. To include any clock skew with server, LMT within
|
||||||
|
* DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS from the request start time is going
|
||||||
|
* to be considered to qualify for recent operation.
|
||||||
|
* @param lastModifiedTime File/Folder LMT
|
||||||
|
* @param expectedLMTUpdateTime original request timestamp which should
|
||||||
|
* have updated the LMT on target
|
||||||
|
* @return true if the LMT is within timespan for recent operation, else false
|
||||||
|
*/
|
||||||
|
public static boolean isRecentlyModified(final String lastModifiedTime,
|
||||||
|
final Instant expectedLMTUpdateTime) {
|
||||||
|
long lmtEpochTime = DateTimeUtils.parseLastModifiedTime(lastModifiedTime);
|
||||||
|
long currentEpochTime = expectedLMTUpdateTime.toEpochMilli();
|
||||||
|
|
||||||
|
return ((lmtEpochTime > currentEpochTime)
|
||||||
|
|| ((currentEpochTime - lmtEpochTime) <= DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS));
|
||||||
|
}
|
||||||
|
|
||||||
|
private DateTimeUtils() {
|
||||||
|
}
|
||||||
|
}
|
@ -740,6 +740,18 @@ Config `fs.azure.account.hns.enabled` provides an option to specify whether
|
|||||||
Config `fs.azure.enable.check.access` needs to be set true to enable
|
Config `fs.azure.enable.check.access` needs to be set true to enable
|
||||||
the AzureBlobFileSystem.access().
|
the AzureBlobFileSystem.access().
|
||||||
|
|
||||||
|
### <a name="idempotency"></a> Operation Idempotency
|
||||||
|
|
||||||
|
Requests failing due to server timeouts and network failures will be retried.
|
||||||
|
PUT/POST operations are idempotent and need no specific handling
|
||||||
|
except for Rename and Delete operations.
|
||||||
|
|
||||||
|
Rename idempotency checks are made by ensuring the LastModifiedTime on destination
|
||||||
|
is recent if source path is found to be non-existent on retry.
|
||||||
|
|
||||||
|
Delete is considered to be idempotent by default if the target does not exist on
|
||||||
|
retry.
|
||||||
|
|
||||||
### <a name="featureconfigoptions"></a> Primary User Group Options
|
### <a name="featureconfigoptions"></a> Primary User Group Options
|
||||||
The group name which is part of FileStatus and AclStatus will be set the same as
|
The group name which is part of FileStatus and AclStatus will be set the same as
|
||||||
the username if the following config is set to true
|
the username if the following config is set to true
|
||||||
|
@ -26,22 +26,39 @@
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||||
|
import static java.net.HttpURLConnection.HTTP_OK;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test delete operation.
|
* Test delete operation.
|
||||||
*/
|
*/
|
||||||
public class ITestAzureBlobFileSystemDelete extends
|
public class ITestAzureBlobFileSystemDelete extends
|
||||||
AbstractAbfsIntegrationTest {
|
AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
|
private static final int REDUCED_RETRY_COUNT = 1;
|
||||||
|
private static final int REDUCED_MAX_BACKOFF_INTERVALS_MS = 5000;
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemDelete() throws Exception {
|
public ITestAzureBlobFileSystemDelete() throws Exception {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
@ -130,4 +147,40 @@ public Void call() throws Exception {
|
|||||||
assertPathDoesNotExist(fs, "deleted", dir);
|
assertPathDoesNotExist(fs, "deleted", dir);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteIdempotency() throws Exception {
|
||||||
|
Assume.assumeTrue(DEFAULT_DELETE_CONSIDERED_IDEMPOTENT);
|
||||||
|
// Config to reduce the retry and maxBackoff time for test run
|
||||||
|
AbfsConfiguration abfsConfig
|
||||||
|
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||||
|
getConfiguration(),
|
||||||
|
REDUCED_RETRY_COUNT, REDUCED_MAX_BACKOFF_INTERVALS_MS);
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||||
|
AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
|
||||||
|
abfsClient,
|
||||||
|
abfsConfig);
|
||||||
|
|
||||||
|
// Mock instance of AbfsRestOperation
|
||||||
|
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
||||||
|
// Set retryCount to non-zero
|
||||||
|
when(op.isARetriedRequest()).thenReturn(true);
|
||||||
|
|
||||||
|
// Mock instance of Http Operation response. This will return HTTP:Not Found
|
||||||
|
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
||||||
|
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
||||||
|
|
||||||
|
// Mock delete response to 404
|
||||||
|
when(op.getResult()).thenReturn(http404Op);
|
||||||
|
|
||||||
|
Assertions.assertThat(testClient.deleteIdempotencyCheckOp(op)
|
||||||
|
.getResult()
|
||||||
|
.getStatusCode())
|
||||||
|
.describedAs(
|
||||||
|
"Delete is considered idempotent by default and should return success.")
|
||||||
|
.isEqualTo(HTTP_OK);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.fs.azurebfs;
|
package org.apache.hadoop.fs.azurebfs;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
@ -25,12 +26,26 @@
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.assertj.core.api.Assertions;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
||||||
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||||
|
import static java.net.HttpURLConnection.HTTP_OK;
|
||||||
|
import static java.util.UUID.randomUUID;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome;
|
||||||
@ -42,6 +57,9 @@
|
|||||||
public class ITestAzureBlobFileSystemRename extends
|
public class ITestAzureBlobFileSystemRename extends
|
||||||
AbstractAbfsIntegrationTest {
|
AbstractAbfsIntegrationTest {
|
||||||
|
|
||||||
|
private static final int REDUCED_RETRY_COUNT = 1;
|
||||||
|
private static final int REDUCED_MAX_BACKOFF_INTERVALS_MS = 5000;
|
||||||
|
|
||||||
public ITestAzureBlobFileSystemRename() throws Exception {
|
public ITestAzureBlobFileSystemRename() throws Exception {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
@ -149,4 +167,97 @@ public void testPosixRenameDirectory() throws Exception {
|
|||||||
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
|
assertTrue(fs.exists(new Path("testDir2/test4/test3")));
|
||||||
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
|
assertFalse(fs.exists(new Path("testDir2/test1/test2/test3")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameRetryFailureAsHTTP400() throws Exception {
|
||||||
|
// Rename failed as Bad Request
|
||||||
|
// RenameIdempotencyCheck should throw back the rename failure Op
|
||||||
|
testRenameTimeout(HTTP_BAD_REQUEST, HTTP_BAD_REQUEST, false,
|
||||||
|
"renameIdempotencyCheckOp should return rename BadRequest "
|
||||||
|
+ "response itself.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameRetryFailureAsHTTP404() throws Exception {
|
||||||
|
// Rename failed as FileNotFound and the destination LMT is
|
||||||
|
// within TimespanForIdentifyingRecentOperationThroughLMT
|
||||||
|
testRenameTimeout(HTTP_NOT_FOUND, HTTP_OK, false,
|
||||||
|
"Rename should return success response because the destination "
|
||||||
|
+ "path is present and its LMT is within "
|
||||||
|
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRenameRetryFailureWithDestOldLMT() throws Exception {
|
||||||
|
// Rename failed as FileNotFound and the destination LMT is
|
||||||
|
// older than TimespanForIdentifyingRecentOperationThroughLMT
|
||||||
|
testRenameTimeout(HTTP_NOT_FOUND, HTTP_NOT_FOUND, true,
|
||||||
|
"Rename should return original rename failure response "
|
||||||
|
+ "because the destination path LMT is older than "
|
||||||
|
+ "TimespanForIdentifyingRecentOperationThroughLMT.");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testRenameTimeout(
|
||||||
|
int renameRequestStatus,
|
||||||
|
int renameIdempotencyCheckStatus,
|
||||||
|
boolean isOldOp,
|
||||||
|
String assertMessage) throws Exception {
|
||||||
|
// Config to reduce the retry and maxBackoff time for test run
|
||||||
|
AbfsConfiguration abfsConfig
|
||||||
|
= TestAbfsConfigurationFieldsValidation.updateRetryConfigs(
|
||||||
|
getConfiguration(),
|
||||||
|
REDUCED_RETRY_COUNT, REDUCED_MAX_BACKOFF_INTERVALS_MS);
|
||||||
|
|
||||||
|
final AzureBlobFileSystem fs = getFileSystem();
|
||||||
|
AbfsClient abfsClient = fs.getAbfsStore().getClient();
|
||||||
|
AbfsClient testClient = TestAbfsClient.createTestClientFromCurrentContext(
|
||||||
|
abfsClient,
|
||||||
|
abfsConfig);
|
||||||
|
|
||||||
|
// Mock instance of AbfsRestOperation
|
||||||
|
AbfsRestOperation op = mock(AbfsRestOperation.class);
|
||||||
|
// Set retryCount to non-zero
|
||||||
|
when(op.isARetriedRequest()).thenReturn(true);
|
||||||
|
|
||||||
|
// Mock instance of Http Operation response. This will return HTTP:Bad Request
|
||||||
|
AbfsHttpOperation http400Op = mock(AbfsHttpOperation.class);
|
||||||
|
when(http400Op.getStatusCode()).thenReturn(HTTP_BAD_REQUEST);
|
||||||
|
|
||||||
|
// Mock instance of Http Operation response. This will return HTTP:Not Found
|
||||||
|
AbfsHttpOperation http404Op = mock(AbfsHttpOperation.class);
|
||||||
|
when(http404Op.getStatusCode()).thenReturn(HTTP_NOT_FOUND);
|
||||||
|
|
||||||
|
Path destinationPath = fs.makeQualified(
|
||||||
|
new Path("destination" + randomUUID().toString()));
|
||||||
|
|
||||||
|
Instant renameRequestStartTime = Instant.now();
|
||||||
|
|
||||||
|
if (renameRequestStatus == HTTP_BAD_REQUEST) {
|
||||||
|
when(op.getResult()).thenReturn(http400Op);
|
||||||
|
} else if (renameRequestStatus == HTTP_NOT_FOUND) {
|
||||||
|
// Create the file new.
|
||||||
|
fs.create(destinationPath);
|
||||||
|
when(op.getResult()).thenReturn(http404Op);
|
||||||
|
|
||||||
|
if (isOldOp) {
|
||||||
|
// instead of sleeping for DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS
|
||||||
|
// which will affect test run time
|
||||||
|
// will modify renameRequestStartTime to a future time so that
|
||||||
|
// lmt will qualify for old op
|
||||||
|
renameRequestStartTime = renameRequestStartTime.plusSeconds(
|
||||||
|
DEFAULT_CLOCK_SKEW_WITH_SERVER_IN_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertThat(testClient.renameIdempotencyCheckOp(
|
||||||
|
renameRequestStartTime,
|
||||||
|
op,
|
||||||
|
destinationPath.toUri().getPath())
|
||||||
|
.getResult()
|
||||||
|
.getStatusCode())
|
||||||
|
.describedAs(assertMessage)
|
||||||
|
.isEqualTo(renameIdempotencyCheckStatus);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -182,4 +182,11 @@ public void testSSLSocketFactoryConfiguration()
|
|||||||
assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
assertEquals(DelegatingSSLSocketFactory.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static AbfsConfiguration updateRetryConfigs(AbfsConfiguration abfsConfig,
|
||||||
|
int retryCount,
|
||||||
|
int backoffTime) {
|
||||||
|
abfsConfig.setMaxIoRetries(retryCount);
|
||||||
|
abfsConfig.setMaxBackoffIntervalMilliseconds(backoffTime);
|
||||||
|
return abfsConfig;
|
||||||
|
}
|
||||||
}
|
}
|
@ -26,6 +26,7 @@
|
|||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||||
|
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||||
@ -35,6 +36,7 @@
|
|||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION;
|
||||||
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
|
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR;
|
||||||
@ -240,4 +242,25 @@ public void verifyUserAgentClusterType() throws Exception {
|
|||||||
.contains(DEFAULT_VALUE_UNKNOWN);
|
.contains(DEFAULT_VALUE_UNKNOWN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static AbfsClient createTestClientFromCurrentContext(
|
||||||
|
AbfsClient baseAbfsClientInstance,
|
||||||
|
AbfsConfiguration abfsConfig)
|
||||||
|
throws AzureBlobFileSystemException {
|
||||||
|
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
|
||||||
|
abfsConfig.getAccountName(),
|
||||||
|
abfsConfig);
|
||||||
|
|
||||||
|
// Create test AbfsClient
|
||||||
|
AbfsClient testClient = new AbfsClient(
|
||||||
|
baseAbfsClientInstance.getBaseUrl(),
|
||||||
|
new SharedKeyCredentials(abfsConfig.getAccountName().substring(0,
|
||||||
|
abfsConfig.getAccountName().indexOf(DOT)),
|
||||||
|
abfsConfig.getStorageAccountKey()),
|
||||||
|
abfsConfig,
|
||||||
|
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()),
|
||||||
|
abfsConfig.getTokenProvider(),
|
||||||
|
tracker);
|
||||||
|
|
||||||
|
return testClient;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user