HADOOP-18656. [ABFS] Add Support for Paginated Delete for Large Directories in HNS Account (#6409)
Contributed by Anuj Modi
This commit is contained in:
parent
d7157b4aa9
commit
6ed73896f6
@ -363,6 +363,10 @@ public class AbfsConfiguration{
|
||||
FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
|
||||
private boolean isChecksumValidationEnabled;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey =
|
||||
FS_AZURE_ENABLE_PAGINATED_DELETE, DefaultValue = DEFAULT_ENABLE_PAGINATED_DELETE)
|
||||
private boolean isPaginatedDeleteEnabled;
|
||||
|
||||
private String clientProvidedEncryptionKey;
|
||||
private String clientProvidedEncryptionKeySHA;
|
||||
|
||||
@ -1240,8 +1244,8 @@ public boolean getRenameResilience() {
|
||||
return renameResilience;
|
||||
}
|
||||
|
||||
void setRenameResilience(boolean actualResilience) {
|
||||
renameResilience = actualResilience;
|
||||
public boolean isPaginatedDeleteEnabled() {
|
||||
return isPaginatedDeleteEnabled;
|
||||
}
|
||||
|
||||
public boolean getIsChecksumValidationEnabled() {
|
||||
|
@ -1077,8 +1077,8 @@ public void delete(final Path path, final boolean recursive,
|
||||
|
||||
do {
|
||||
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
|
||||
AbfsRestOperation op = client
|
||||
.deletePath(relativePath, recursive, continuation, tracingContext);
|
||||
AbfsRestOperation op = client.deletePath(relativePath, recursive,
|
||||
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
|
||||
perfInfo.registerResult(op.getResult());
|
||||
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
|
||||
perfInfo.registerSuccess(true);
|
||||
|
@ -121,8 +121,37 @@ public final class AbfsHttpConstants {
|
||||
public static final char CHAR_EQUALS = '=';
|
||||
public static final char CHAR_STAR = '*';
|
||||
public static final char CHAR_PLUS = '+';
|
||||
public static final String DECEMBER_2019_API_VERSION = "2019-12-12";
|
||||
public static final String APRIL_2021_API_VERSION = "2021-04-10";
|
||||
|
||||
/**
|
||||
* Specifies the version of the REST protocol used for processing the request.
|
||||
* Versions should be added in enum list in ascending chronological order.
|
||||
* Latest one should be added last in the list.
|
||||
* When upgrading the version for whole driver, update the getCurrentVersion;
|
||||
*/
|
||||
public enum ApiVersion {
|
||||
|
||||
DEC_12_2019("2019-12-12"),
|
||||
APR_10_2021("2021-04-10"),
|
||||
AUG_03_2023("2023-08-03");
|
||||
|
||||
private final String xMsApiVersion;
|
||||
|
||||
ApiVersion(String xMsApiVersion) {
|
||||
this.xMsApiVersion = xMsApiVersion;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return xMsApiVersion;
|
||||
}
|
||||
|
||||
public static ApiVersion getCurrentVersion() {
|
||||
return DEC_12_2019;
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public static final String DECEMBER_2019_API_VERSION = ApiVersion.DEC_12_2019.toString();
|
||||
|
||||
/**
|
||||
* Value that differentiates categories of the http_status.
|
||||
|
@ -275,6 +275,11 @@ public final class ConfigurationKeys {
|
||||
/** Add extra resilience to rename failures, at the expense of performance. */
|
||||
public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
|
||||
|
||||
/**
|
||||
* Specify whether paginated behavior is to be expected or not in delete path. {@value}
|
||||
*/
|
||||
public static final String FS_AZURE_ENABLE_PAGINATED_DELETE = "fs.azure.enable.paginated.delete";
|
||||
|
||||
/** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
|
||||
public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";
|
||||
|
||||
|
@ -133,6 +133,7 @@ public final class FileSystemConfigurations {
|
||||
public static final int STREAM_ID_LEN = 12;
|
||||
public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
|
||||
public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
|
||||
public static final boolean DEFAULT_ENABLE_PAGINATED_DELETE = false;
|
||||
public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;
|
||||
|
||||
/**
|
||||
|
@ -40,6 +40,7 @@ public final class HttpQueryParams {
|
||||
public static final String QUERY_PARAM_CLOSE = "close";
|
||||
public static final String QUERY_PARAM_UPN = "upn";
|
||||
public static final String QUERY_PARAM_BLOBTYPE = "blobtype";
|
||||
public static final String QUERY_PARAM_PAGINATED = "paginated";
|
||||
|
||||
//query params for SAS
|
||||
public static final String QUERY_PARAM_SAOID = "saoid";
|
||||
|
@ -100,7 +100,7 @@ public class AbfsClient implements Closeable {
|
||||
|
||||
private final URL baseUrl;
|
||||
private final SharedKeyCredentials sharedKeyCredentials;
|
||||
private String xMsVersion = DECEMBER_2019_API_VERSION;
|
||||
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion();
|
||||
private final ExponentialRetryPolicy exponentialRetryPolicy;
|
||||
private final StaticRetryPolicy staticRetryPolicy;
|
||||
private final String filesystem;
|
||||
@ -122,7 +122,6 @@ public class AbfsClient implements Closeable {
|
||||
private final ListeningScheduledExecutorService executorService;
|
||||
private Boolean isNamespaceEnabled;
|
||||
|
||||
|
||||
private boolean renameResilience;
|
||||
|
||||
/**
|
||||
@ -149,7 +148,7 @@ private AbfsClient(final URL baseUrl,
|
||||
|
||||
if (encryptionContextProvider != null) {
|
||||
this.encryptionContextProvider = encryptionContextProvider;
|
||||
xMsVersion = APRIL_2021_API_VERSION; // will be default once server change deployed
|
||||
xMsVersion = ApiVersion.APR_10_2021; // will be default once server change deployed
|
||||
encryptionType = EncryptionType.ENCRYPTION_CONTEXT;
|
||||
} else if (abfsConfiguration.getEncodedClientProvidedEncryptionKey() != null) {
|
||||
clientProvidedEncryptionKey =
|
||||
@ -259,13 +258,27 @@ AbfsThrottlingIntercept getIntercept() {
|
||||
return intercept;
|
||||
}
|
||||
|
||||
List<AbfsHttpHeader> createDefaultHeaders() {
|
||||
/**
|
||||
* Create request headers for Rest Operation using the current API version.
|
||||
* @return default request headers
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected List<AbfsHttpHeader> createDefaultHeaders() {
|
||||
return createDefaultHeaders(this.xMsVersion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create request headers for Rest Operation using the specified API version.
|
||||
* @param xMsVersion
|
||||
* @return default request headers
|
||||
*/
|
||||
private List<AbfsHttpHeader> createDefaultHeaders(ApiVersion xMsVersion) {
|
||||
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
|
||||
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
|
||||
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString()));
|
||||
requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
|
||||
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
|
||||
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
|
||||
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET,
|
||||
UTF_8));
|
||||
UTF_8));
|
||||
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
|
||||
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent));
|
||||
return requestHeaders;
|
||||
@ -1117,12 +1130,29 @@ public AbfsRestOperation read(final String path,
|
||||
return op;
|
||||
}
|
||||
|
||||
public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation,
|
||||
TracingContext tracingContext)
|
||||
public AbfsRestOperation deletePath(final String path, final boolean recursive,
|
||||
final String continuation,
|
||||
TracingContext tracingContext,
|
||||
final boolean isNamespaceEnabled)
|
||||
throws AzureBlobFileSystemException {
|
||||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
|
||||
|
||||
/*
|
||||
* If Pagination is enabled and current API version is old,
|
||||
* use the minimum required version for pagination.
|
||||
* If Pagination is enabled and current API version is later than minimum required
|
||||
* version for pagination, use current version only as azure service is backward compatible.
|
||||
* If pagination is disabled, use the current API version only.
|
||||
*/
|
||||
final List<AbfsHttpHeader> requestHeaders = (isPaginatedDelete(recursive,
|
||||
isNamespaceEnabled) && xMsVersion.compareTo(ApiVersion.AUG_03_2023) < 0)
|
||||
? createDefaultHeaders(ApiVersion.AUG_03_2023)
|
||||
: createDefaultHeaders();
|
||||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
|
||||
|
||||
if (isPaginatedDelete(recursive, isNamespaceEnabled)) {
|
||||
// Add paginated query parameter
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE);
|
||||
}
|
||||
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
|
||||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
|
||||
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
|
||||
@ -1465,6 +1495,14 @@ private synchronized Boolean getIsNamespaceEnabled(TracingContext tracingContext
|
||||
return isNamespaceEnabled;
|
||||
}
|
||||
|
||||
protected Boolean getIsPaginatedDeleteEnabled() {
|
||||
return abfsConfiguration.isPaginatedDeleteEnabled();
|
||||
}
|
||||
|
||||
private Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) {
|
||||
return getIsPaginatedDeleteEnabled() && isNamespaceEnabled && isRecursiveDelete;
|
||||
}
|
||||
|
||||
public AuthType getAuthType() {
|
||||
return authType;
|
||||
}
|
||||
@ -1659,7 +1697,7 @@ protected AbfsCounters getAbfsCounters() {
|
||||
return abfsCounters;
|
||||
}
|
||||
|
||||
public String getxMsVersion() {
|
||||
public ApiVersion getxMsVersion() {
|
||||
return xMsVersion;
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -215,6 +216,7 @@ public void setup() throws Exception {
|
||||
wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
|
||||
wasb.initialize(wasbUri, rawConfig);
|
||||
}
|
||||
// Todo: To be fixed in HADOOP-19137
|
||||
AbfsClientUtils.setIsNamespaceEnabled(abfs.getAbfsClient(), true);
|
||||
}
|
||||
|
||||
@ -532,4 +534,10 @@ protected long assertAbfsStatistics(AbfsStatistic statistic,
|
||||
(long) metricMap.get(statistic.getStatName()));
|
||||
return expectedValue;
|
||||
}
|
||||
|
||||
protected void assumeValidTestConfigPresent(final Configuration conf, final String key) {
|
||||
String configuredValue = conf.get(key);
|
||||
Assume.assumeTrue(String.format("Missing Required Test Config: %s.", key),
|
||||
configuredValue != null && !configuredValue.isEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -323,8 +323,9 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
|
||||
return client.renamePath(path, new Path(path + "_2").toString(),
|
||||
null, tc, null, false, fs.getIsNamespaceEnabled(tc)).getOp();
|
||||
case DELETE:
|
||||
TracingContext testTC = getTestTracingContext(fs, false);
|
||||
return client.deletePath(path, false, null,
|
||||
getTestTracingContext(fs, false));
|
||||
testTC, fs.getIsNamespaceEnabled(testTC));
|
||||
case GET_ATTR:
|
||||
return client.getPathStatus(path, true,
|
||||
getTestTracingContext(fs, false),
|
||||
|
@ -242,7 +242,8 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
"/NonExistingPath",
|
||||
false,
|
||||
null,
|
||||
getTestTracingContext(fs, true)));
|
||||
getTestTracingContext(fs, true),
|
||||
fs.getIsNamespaceEnabled(getTestTracingContext(fs, true))));
|
||||
|
||||
// mock idempotency check to mimic retried case
|
||||
AbfsClient mockClient = ITestAbfsClient.getMockAbfsClient(
|
||||
@ -269,14 +270,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
|
||||
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
|
||||
TracingContext tracingContext = getTestTracingContext(fs, false);
|
||||
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
|
||||
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
|
||||
when(mockClient.deletePath("/NonExistingPath", false, null,
|
||||
tracingContext, fs.getIsNamespaceEnabled(tracingContext)))
|
||||
.thenCallRealMethod();
|
||||
|
||||
Assertions.assertThat(mockClient.deletePath(
|
||||
"/NonExistingPath",
|
||||
false,
|
||||
null,
|
||||
tracingContext)
|
||||
tracingContext, fs.getIsNamespaceEnabled(tracingContext))
|
||||
.getResult()
|
||||
.getStatusCode())
|
||||
.describedAs("Idempotency check reports successful "
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
|
||||
|
||||
public final class AbfsClientUtils {
|
||||
@ -31,4 +33,13 @@ public static void setIsNamespaceEnabled(final AbfsClient abfsClient, final Bool
|
||||
public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) {
|
||||
abfsClient.setEncryptionContextProvider(provider);
|
||||
}
|
||||
|
||||
public static String getHeaderValue(List<AbfsHttpHeader> reqHeaders, String headerName) {
|
||||
for (AbfsHttpHeader header : reqHeaders) {
|
||||
if (header.getName().equals(headerName)) {
|
||||
return header.getValue();
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
@ -413,7 +413,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
|
||||
return client;
|
||||
}
|
||||
|
||||
private static AbfsClient setAbfsClientField(
|
||||
static AbfsClient setAbfsClientField(
|
||||
final AbfsClient client,
|
||||
final String fieldName,
|
||||
Object fieldObject) throws Exception {
|
||||
|
@ -0,0 +1,333 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.util.Lists;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SECURE_SCHEME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_VERSION;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PAGINATED;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_ID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_FS_CLIENT_SECRET;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
|
||||
import static org.apache.hadoop.fs.azurebfs.services.AbfsClientUtils.getHeaderValue;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Tests to verify server side pagination feature is supported from driver.
|
||||
*/
|
||||
public class ITestAbfsPaginatedDelete extends AbstractAbfsIntegrationTest {
|
||||
|
||||
/**
|
||||
* File system using super-user OAuth, used to create the directory.
|
||||
*/
|
||||
private AzureBlobFileSystem superUserFs;
|
||||
|
||||
/**
|
||||
* File system using NoRBAC user OAuth, used to delete the directory.
|
||||
* This user will have default ACL permissions set on root path including delete.
|
||||
* Since this is not a super-user, azure servers will trigger recursive ACL
|
||||
* checks on root path when delete is called using this user OAuth token.
|
||||
*/
|
||||
private AzureBlobFileSystem testUserFs;
|
||||
|
||||
/**
|
||||
* Service supports Pagination only for HNS Accounts.
|
||||
*/
|
||||
private boolean isHnsEnabled;
|
||||
|
||||
public ITestAbfsPaginatedDelete() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create file system instances for both super-user and test user.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
this.superUserFs = getFileSystem();
|
||||
|
||||
assumeValidTestConfigPresent(this.getRawConfiguration(),
|
||||
FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT);
|
||||
isHnsEnabled = this.getConfiguration().getBoolean(
|
||||
FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
|
||||
|
||||
assumeTestUserCredentialsConfigured();
|
||||
this.testUserFs = isHnsEnabled ? createTestUserFs() : null;
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem createTestUserFs() throws IOException {
|
||||
// Test User Credentials.
|
||||
String firstTestUserGuid = getConfiguration().get(
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
|
||||
String clientId = getConfiguration().getString(
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID, "");
|
||||
String clientSecret = getConfiguration().getString(
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET, "");
|
||||
|
||||
Configuration testUserConf = new Configuration(getRawConfiguration());
|
||||
setTestUserConf(testUserConf, FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.OAuth.name());
|
||||
setTestUserConf(testUserConf, FS_AZURE_BLOB_FS_CLIENT_ID, clientId);
|
||||
setTestUserConf(testUserConf, FS_AZURE_BLOB_FS_CLIENT_SECRET, clientSecret);
|
||||
setTestUserConf(testUserConf, FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME,
|
||||
ClientCredsTokenProvider.class.getName());
|
||||
|
||||
testUserConf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false);
|
||||
testUserConf.setBoolean(String.format("fs.%s.impl.disable.cache", ABFS_SECURE_SCHEME), true);
|
||||
|
||||
setDefaultAclOnRoot(firstTestUserGuid);
|
||||
return (AzureBlobFileSystem) FileSystem.newInstance(testUserConf);
|
||||
}
|
||||
|
||||
private void setTestUserConf(Configuration conf, String key, String value) {
|
||||
conf.set(key, value);
|
||||
conf.set(key + "." + getAccountName(), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check that recursive deletePath works with paginated enabled and
|
||||
* disabled for both empty and non-empty directory.
|
||||
* When enabled appropriate xMsVersion should be used.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRecursiveDeleteWithPagination() throws Exception {
|
||||
testRecursiveDeleteWithPaginationInternal(false, true,
|
||||
AbfsHttpConstants.ApiVersion.DEC_12_2019);
|
||||
testRecursiveDeleteWithPaginationInternal(false, true,
|
||||
AbfsHttpConstants.ApiVersion.AUG_03_2023);
|
||||
testRecursiveDeleteWithPaginationInternal(false, false,
|
||||
AbfsHttpConstants.ApiVersion.DEC_12_2019);
|
||||
testRecursiveDeleteWithPaginationInternal(false, false,
|
||||
AbfsHttpConstants.ApiVersion.AUG_03_2023);
|
||||
testRecursiveDeleteWithPaginationInternal(true, true,
|
||||
AbfsHttpConstants.ApiVersion.DEC_12_2019);
|
||||
testRecursiveDeleteWithPaginationInternal(true, false,
|
||||
AbfsHttpConstants.ApiVersion.AUG_03_2023);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check that non-recursive delete works with both paginated enabled
|
||||
* and disabled only for empty directories.
|
||||
* Pagination should not be set when recursive is false.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testNonRecursiveDeleteWithPagination() throws Exception {
|
||||
testNonRecursiveDeleteWithPaginationInternal(true);
|
||||
testNonRecursiveDeleteWithPaginationInternal(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check that with pagination enabled, invalid CT will fail
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testRecursiveDeleteWithInvalidCT() throws Exception {
|
||||
testRecursiveDeleteWithInvalidCTInternal(true);
|
||||
testRecursiveDeleteWithInvalidCTInternal(false);
|
||||
}
|
||||
|
||||
private void testRecursiveDeleteWithPaginationInternal(boolean isEmptyDir,
|
||||
boolean isPaginatedDeleteEnabled, AbfsHttpConstants.ApiVersion xMsVersion)
|
||||
throws Exception {
|
||||
final AzureBlobFileSystem fs = getUserFileSystem();
|
||||
TracingContext testTC = getTestTracingContext(fs, true);
|
||||
|
||||
Path testPath;
|
||||
if (isEmptyDir) {
|
||||
testPath = new Path("/emptyPath" + StringUtils.right(
|
||||
UUID.randomUUID().toString(), 10));
|
||||
superUserFs.mkdirs(testPath);
|
||||
} else {
|
||||
testPath = createSmallDir();
|
||||
}
|
||||
|
||||
// Set the paginated enabled value and xMsVersion at spiedClient level.
|
||||
AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient());
|
||||
ITestAbfsClient.setAbfsClientField(spiedClient, "xMsVersion", xMsVersion);
|
||||
Mockito.doReturn(isPaginatedDeleteEnabled).when(spiedClient).getIsPaginatedDeleteEnabled();
|
||||
|
||||
AbfsRestOperation op = spiedClient.deletePath(
|
||||
testPath.toString(), true, null, testTC, isHnsEnabled);
|
||||
|
||||
// Getting the xMsVersion that was used to make the request
|
||||
String xMsVersionUsed = getHeaderValue(op.getRequestHeaders(), X_MS_VERSION);
|
||||
String urlUsed = op.getUrl().toString();
|
||||
|
||||
// Assert that appropriate xMsVersion and query param was used to make request
|
||||
if (isPaginatedDeleteEnabled && isHnsEnabled) {
|
||||
Assertions.assertThat(urlUsed)
|
||||
.describedAs("Url must have paginated = true as query param")
|
||||
.contains(QUERY_PARAM_PAGINATED);
|
||||
if (xMsVersion.compareTo(AbfsHttpConstants.ApiVersion.AUG_03_2023) < 0) {
|
||||
Assertions.assertThat(xMsVersionUsed)
|
||||
.describedAs("Request was made with wrong x-ms-version")
|
||||
.isEqualTo(AbfsHttpConstants.ApiVersion.AUG_03_2023.toString());
|
||||
} else if (xMsVersion.compareTo(AbfsHttpConstants.ApiVersion.AUG_03_2023) >= 0) {
|
||||
Assertions.assertThat(xMsVersionUsed)
|
||||
.describedAs("Request was made with wrong x-ms-version")
|
||||
.isEqualTo(xMsVersion.toString());
|
||||
}
|
||||
} else {
|
||||
Assertions.assertThat(urlUsed)
|
||||
.describedAs("Url must not have paginated = true as query param")
|
||||
.doesNotContain(QUERY_PARAM_PAGINATED);
|
||||
Assertions.assertThat(xMsVersionUsed)
|
||||
.describedAs("Request was made with wrong x-ms-version")
|
||||
.isEqualTo(xMsVersion.toString());
|
||||
}
|
||||
|
||||
// Assert that deletion was successful in every scenario.
|
||||
AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () ->
|
||||
spiedClient.getPathStatus(testPath.toString(), false, testTC, null));
|
||||
assertStatusCode(e, HTTP_NOT_FOUND);
|
||||
}
|
||||
|
||||
private void testNonRecursiveDeleteWithPaginationInternal(boolean isPaginatedDeleteEnabled) throws Exception{
|
||||
final AzureBlobFileSystem fs = getUserFileSystem();
|
||||
TracingContext testTC = getTestTracingContext(fs, true);
|
||||
|
||||
Path testPath = new Path("/emptyPath");
|
||||
superUserFs.mkdirs(testPath);
|
||||
|
||||
// Set the paginated enabled value at spiedClient level.
|
||||
AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient());
|
||||
Mockito.doReturn(isPaginatedDeleteEnabled).when(spiedClient).getIsPaginatedDeleteEnabled();
|
||||
|
||||
AbfsRestOperation op = spiedClient.deletePath(
|
||||
testPath.toString(), false, null, testTC, isHnsEnabled);
|
||||
|
||||
// Getting the url that was used to make the request
|
||||
String urlUsed = op.getUrl().toString();
|
||||
|
||||
// Assert that paginated query param was not set to make request
|
||||
Assertions.assertThat(urlUsed)
|
||||
.describedAs("Url must not have paginated as query param")
|
||||
.doesNotContain(QUERY_PARAM_PAGINATED);
|
||||
|
||||
// Assert that deletion was successful in every scenario.
|
||||
AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () ->
|
||||
spiedClient.getPathStatus(testPath.toString(), false, testTC, null));
|
||||
assertStatusCode(e, HTTP_NOT_FOUND);
|
||||
}
|
||||
|
||||
private void testRecursiveDeleteWithInvalidCTInternal(boolean isPaginatedEnabled) throws Exception {
|
||||
final AzureBlobFileSystem fs = getUserFileSystem();
|
||||
|
||||
Path testPath = createSmallDir();
|
||||
String randomCT = "randomContinuationToken1234";
|
||||
TracingContext testTC = getTestTracingContext(this.testUserFs, true);
|
||||
|
||||
AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient());
|
||||
Mockito.doReturn(isPaginatedEnabled).when(spiedClient).getIsPaginatedDeleteEnabled();
|
||||
|
||||
AbfsRestOperationException e = intercept(AbfsRestOperationException.class, () ->
|
||||
spiedClient.deletePath(testPath.toString(), true, randomCT, testTC, isHnsEnabled));
|
||||
assertStatusCode(e, HTTP_BAD_REQUEST);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide test user with default ACL permissions on root.
|
||||
* @param uid
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setDefaultAclOnRoot(String uid)
|
||||
throws IOException {
|
||||
List<AclEntry> aclSpec = Lists.newArrayList(AclTestHelpers.aclEntry(
|
||||
AclEntryScope.ACCESS, AclEntryType.USER, uid, FsAction.ALL),
|
||||
AclTestHelpers.aclEntry(AclEntryScope.DEFAULT, AclEntryType.USER, uid, FsAction.ALL));
|
||||
// Use SuperUser Privilege to set ACL on root for test user.
|
||||
this.superUserFs.modifyAclEntries(new Path("/"), aclSpec);
|
||||
}
|
||||
|
||||
private Path createSmallDir() throws IOException {
|
||||
String rootPath = "/smallDir" + StringUtils.right(
|
||||
UUID.randomUUID().toString(), 10);
|
||||
String firstFilePath = rootPath + "/placeholderFile";
|
||||
this.superUserFs.create(new Path(firstFilePath));
|
||||
|
||||
for (int i = 1; i <= 2; i++) {
|
||||
String dirPath = "/dirLevel1-" + i + "/dirLevel2-" + i;
|
||||
String filePath = rootPath + dirPath + "/file-" + i;
|
||||
this.superUserFs.create(new Path(filePath));
|
||||
}
|
||||
return new Path(rootPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Select the filesystem to be used for delete API.
|
||||
* For HNS Disabled accounts, test User FS won't have permissions as ACL is not supported
|
||||
* @return
|
||||
*/
|
||||
private AzureBlobFileSystem getUserFileSystem() {
|
||||
return this.isHnsEnabled ? this.testUserFs : this.superUserFs;
|
||||
}
|
||||
|
||||
private void assertStatusCode(final AbfsRestOperationException e, final int statusCode) {
|
||||
Assertions.assertThat(e.getStatusCode())
|
||||
.describedAs("Request Should fail with Bad Request instead of %s",
|
||||
e.toString())
|
||||
.isEqualTo(statusCode);
|
||||
}
|
||||
|
||||
private void assumeTestUserCredentialsConfigured() {
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user