From b15ed27cfbfe80e83ce835aa882cc3f62211b300 Mon Sep 17 00:00:00 2001 From: Anuj Modi <128447756+anujmodi2021@users.noreply.github.com> Date: Tue, 20 Aug 2024 22:37:07 +0530 Subject: [PATCH] HADOOP-19187: [ABFS][FNSOverBlob] AbfsClient Refactoring to Support Multiple Implementation of Clients. (#6879) Refactor AbfsClient into DFS and Blob Client. Contributed by Anuj Modi --- .../src/config/checkstyle-suppressions.xml | 2 + .../hadoop/fs/azurebfs/AbfsConfiguration.java | 83 +- .../fs/azurebfs/AzureBlobFileSystem.java | 56 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 102 +- .../azurebfs/constants/AbfsHttpConstants.java | 2 + .../azurebfs/constants/AbfsServiceType.java | 37 + .../azurebfs/constants/ConfigurationKeys.java | 28 +- .../azurebfs/constants/FSOperationType.java | 3 +- .../constants/FileSystemConfigurations.java | 1 + .../constants/FileSystemUriSchemes.java | 5 +- .../InvalidConfigurationValueException.java | 4 + .../fs/azurebfs/services/AbfsClient.java | 1249 ++++++---------- .../azurebfs/services/AbfsClientHandler.java | 127 ++ .../fs/azurebfs/services/AbfsDfsClient.java | 1302 +++++++++++++++++ .../hadoop/fs/azurebfs/utils/UriUtils.java | 36 + .../src/site/markdown/fns_blob.md | 82 ++ .../hadoop-azure/src/site/markdown/index.md | 1 + .../azurebfs/ITestAbfsCustomEncryption.java | 3 +- .../ITestAzureBlobFileSystemCheckAccess.java | 9 +- ...ITestAzureBlobFileSystemInitAndCreate.java | 44 +- .../fs/azurebfs/ITestGetNameSpaceEnabled.java | 14 +- .../fs/azurebfs/services/ITestAbfsClient.java | 9 +- 22 files changed, 2287 insertions(+), 912 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsServiceType.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java create mode 100644 hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 2065746b76..27ab432904 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -44,6 +44,8 @@ + HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR) { + LOG.debug("getNamespace failed with non 400 user error", ex); + statIncrement(ERROR_IGNORED); + return true; + } + throw ex; + } + } + private boolean fileSystemExists() throws IOException { LOG.debug( "AzureBlobFileSystem.fileSystemExists uri: {}", uri); @@ -1660,7 +1706,7 @@ AbfsDelegationTokenManager getDelegationTokenManager() { @VisibleForTesting boolean getIsNamespaceEnabled(TracingContext tracingContext) throws AzureBlobFileSystemException { - return abfsStore.getIsNamespaceEnabled(tracingContext); + return getAbfsStore().getIsNamespaceEnabled(tracingContext); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 784e3f25c6..e903421f46 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -29,11 +29,9 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.time.Instant; @@ -55,11 +53,13 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.fs.PathIOException; @@ -158,6 +158,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; /** @@ -169,6 +170,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class); private AbfsClient client; + + /** + * Variable to hold the client handler which will determine the operative + * client based on the service type configured. + * Initialized in the {@link #initializeClient(URI, String, String, boolean)}. + */ + private AbfsClientHandler clientHandler; private URI uri; private String userName; private String primaryUserGroup; @@ -221,7 +229,8 @@ public AzureBlobFileSystemStore( leaseRefs = Collections.synchronizedMap(new WeakHashMap<>()); try { - this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName); + this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, + accountName, getAbfsServiceTypeFromUrl()); } catch (IllegalAccessException exception) { throw new FileSystemOperationUnhandledException(exception); } @@ -286,6 +295,8 @@ public AzureBlobFileSystemStore( /** * Checks if the given key in Azure Storage should be stored as a page * blob instead of block blob. + * @param key The key to check. + * @return True if the key should be stored as a page blob, false otherwise. */ public boolean isAppendBlobKey(String key) { return isKeyForDirectorySet(key, appendBlobDirSet); @@ -497,15 +508,9 @@ public void setFilesystemProperties( try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties", "setFilesystemProperties")) { - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } - final AbfsRestOperation op = client - .setFilesystemProperties(commaSeparatedProperties, tracingContext); + final AbfsRestOperation op = getClient() + .setFilesystemProperties(properties, tracingContext); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -590,18 +595,13 @@ public void setPathProperties(final Path path, path, properties); - final String commaSeparatedProperties; - try { - commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); - } catch (CharacterCodingException ex) { - throw new InvalidAbfsRestOperationException(ex); - } + final String relativePath = getRelativePath(path); final ContextEncryptionAdapter contextEncryptionAdapter = createEncryptionAdapterFromServerStoreContext(relativePath, tracingContext); final AbfsRestOperation op = client - .setPathProperties(getRelativePath(path), commaSeparatedProperties, + .setPathProperties(getRelativePath(path), properties, tracingContext, contextEncryptionAdapter); contextEncryptionAdapter.destroy(); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -1090,7 +1090,7 @@ public boolean rename(final Path source, final AbfsClientRenameResult abfsClientRenameResult = client.renamePath(sourceRelativePath, destinationRelativePath, continuation, tracingContext, sourceEtag, false, - isNamespaceEnabled); + isNamespaceEnabled); AbfsRestOperation op = abfsClientRenameResult.getOp(); perfInfo.registerResult(op.getResult()); @@ -1369,7 +1369,7 @@ private String generateContinuationTokenForNonXns(String path, final String firs SimpleDateFormat simpleDateFormat = new SimpleDateFormat(TOKEN_DATE_PATTERN, Locale.US); String date = simpleDateFormat.format(new Date()); String token = String.format("%06d!%s!%06d!%s!%06d!%s!", - path.length(), path, startFrom.length(), startFrom, date.length(), date); + path.length(), path, startFrom.length(), startFrom, date.length(), date); String base64EncodedToken = Base64.encode(token.getBytes(StandardCharsets.UTF_8)); StringBuilder encodedTokenBuilder = new StringBuilder(base64EncodedToken.length() + 5); @@ -1810,18 +1810,29 @@ private void initializeClient(URI uri, String fileSystemName, LOG.trace("Initializing AbfsClient for {}", baseUrl); if (tokenProvider != null) { - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, + this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration, tokenProvider, encryptionContextProvider, populateAbfsClientContext()); } else { - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, + this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration, sasTokenProvider, encryptionContextProvider, populateAbfsClientContext()); } + this.client = getClientHandler().getClient(); LOG.trace("AbfsClient init complete"); } + private AbfsServiceType getAbfsServiceTypeFromUrl() { + if (uri.toString().contains(ABFS_BLOB_DOMAIN_NAME)) { + return AbfsServiceType.BLOB; + } + // In case of DFS Domain name or any other custom endpoint, the service + // type is to be identified as default DFS. + LOG.debug("Falling back to default service type DFS"); + return AbfsServiceType.DFS; + } + /** * Populate a new AbfsClientContext instance with the desired properties. * @@ -1861,43 +1872,6 @@ private boolean parseIsDirectory(final String resourceType) { && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); } - /** - * Convert properties stored in a Map into a comma separated string. For map - * , method would convert to: - * key1=value1,key2=value,...,keyN=valueN - * */ - @VisibleForTesting - String convertXmsPropertiesToCommaSeparatedString(final Map properties) throws - CharacterCodingException { - StringBuilder commaSeparatedProperties = new StringBuilder(); - - final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder(); - - for (Map.Entry propertyEntry : properties.entrySet()) { - String key = propertyEntry.getKey(); - String value = propertyEntry.getValue(); - - Boolean canEncodeValue = encoder.canEncode(value); - if (!canEncodeValue) { - throw new CharacterCodingException(); - } - - String encodedPropertyValue = Base64.encode(encoder.encode(CharBuffer.wrap(value)).array()); - commaSeparatedProperties.append(key) - .append(AbfsHttpConstants.EQUAL) - .append(encodedPropertyValue); - - commaSeparatedProperties.append(AbfsHttpConstants.COMMA); - } - - if (commaSeparatedProperties.length() != 0) { - commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1); - } - - return commaSeparatedProperties.toString(); - } - private Hashtable parseCommaSeparatedXmsProperties(String xMsProperties) throws InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { Hashtable properties = new Hashtable<>(); @@ -2176,6 +2150,16 @@ public AbfsClient getClient() { return this.client; } + @VisibleForTesting + public AbfsClient getClient(AbfsServiceType serviceType) { + return getClientHandler().getClient(serviceType); + } + + @VisibleForTesting + public AbfsClientHandler getClientHandler() { + return this.clientHandler; + } + @VisibleForTesting void setClient(AbfsClient client) { this.client = client; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 84127d9d57..26106a717c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -101,6 +101,8 @@ public final class AbfsHttpConstants { public static final String GMT_TIMEZONE = "GMT"; public static final String APPLICATION_JSON = "application/json"; public static final String APPLICATION_OCTET_STREAM = "application/octet-stream"; + public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1"; + public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8"; public static final String ROOT_PATH = "/"; public static final String ACCESS_MASK = "mask:"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsServiceType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsServiceType.java new file mode 100644 index 0000000000..c84d4b0dfa --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsServiceType.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.constants; + +/** + * Azure Storage Offers two sets of Rest APIs for interacting with the storage account. + *
    + *
  1. Blob Rest API:
  2. + *
  3. Data Lake Rest API:
  4. + *
+ */ +public enum AbfsServiceType { + /** + * Service type to set operative endpoint as Data Lake Rest API. + */ + DFS, + /** + * Service type to set operative endpoint as Blob Rest API. + */ + BLOB +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index ed749c7885..620182f5bd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -32,9 +32,35 @@ public final class ConfigurationKeys { /** * Config to specify if the configured account is HNS enabled or not. If * this config is not set, getacl call is made on account filesystem root - * path to determine HNS status. + * path on DFS Endpoint to determine HNS status. */ public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled"; + + /** + * Config to specify which {@link AbfsServiceType} to use with HNS-Disabled Account type. + * Default value will be identified from URL used to initialize filesystem. + * This will allow an override to choose service endpoint in cases where any + * local DNS resolution is set for account and driver is unable to detect + * intended endpoint from the url used to initialize filesystem. + * If configured Blob for HNS-Enabled account, FS init will fail. + * Value {@value} case-insensitive "DFS" or "BLOB" + */ + public static final String FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE = "fs.azure.fns.account.service.type"; + + /** + * Config to specify which {@link AbfsServiceType} to use only for Ingress Operations. + * Other operations will continue to move to the FS configured service endpoint. + * Value {@value} case-insensitive "DFS" or "BLOB" + */ + public static final String FS_AZURE_INGRESS_SERVICE_TYPE = "fs.azure.ingress.service.type"; + + /** + * Config to be set only for cases where traffic over dfs endpoint is + * experiencing compatibility issues and need to move to blob for mitigation. + * Value {@value} case-insensitive "True" or "False" + */ + public static final String FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = "fs.azure.enable.dfstoblob.fallback"; + /** * Enable or disable expect hundred continue header. * Value: {@value}. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java index 6b6e98c9c7..8c9c8af75b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FSOperationType.java @@ -45,7 +45,8 @@ public enum FSOperationType { SET_OWNER("SO"), SET_ACL("SA"), TEST_OP("TS"), - WRITE("WR"); + WRITE("WR"), + INIT("IN"); private final String opCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index bd2d6e4b57..f8c97b031b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -32,6 +32,7 @@ public final class FileSystemConfigurations { public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = ""; + public static final boolean DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = false; public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true; public static final String USER_HOME_DIRECTORY_PREFIX = "/user"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java index c7a0cdad60..0b5cba72f1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemUriSchemes.java @@ -38,5 +38,8 @@ public final class FileSystemUriSchemes { public static final String WASB_SECURE_SCHEME = "wasbs"; public static final String WASB_DNS_PREFIX = "blob"; + public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net"; + public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net"; + private FileSystemUriSchemes() {} -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidConfigurationValueException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidConfigurationValueException.java index 7591bac59e..fef2b073ce 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidConfigurationValueException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/InvalidConfigurationValueException.java @@ -34,4 +34,8 @@ public InvalidConfigurationValueException(String configKey, Exception innerExcep public InvalidConfigurationValueException(String configKey) { super("Invalid configuration value detected for " + configKey); } + + public InvalidConfigurationValueException(String configKey, String message) { + super(String.format("Invalid configuration value detected for \"%s\". %s ", configKey, message)); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index a2d65c145b..ca35015b19 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -31,17 +31,18 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Base64; +import java.util.Hashtable; import java.util.List; import java.util.Locale; import java.util.Timer; import java.util.TimerTask; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; @@ -67,9 +68,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; @@ -83,25 +83,52 @@ import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.util.concurrent.HadoopExecutors; -import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CLIENT_VERSION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH_ENCODE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VENDOR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.JAVA_VERSION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MD5; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_ARCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.OS_VERSION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PLUS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.PLUS_ENCODE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UTF_8; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; -import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; -import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_VERSION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_TIMEOUT; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_ABBREVIATION; /** * AbfsClient. */ -public class AbfsClient implements Closeable { +public abstract class AbfsClient implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON; @@ -147,7 +174,7 @@ public class AbfsClient implements Closeable { /** * logging the rename failure if metadata is in an incomplete state. */ - private static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG); + protected static final LogExactlyOnce ABFS_METADATA_INCOMPLETE_RENAME_FAILURE = new LogExactlyOnce(LOG); private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -255,7 +282,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final SASTokenProvider sasTokenProvider, - final EncryptionContextProvider encryptionContextProvider, + final EncryptionContextProvider encryptionContextProvider, final AbfsClientContext abfsClientContext) throws IOException { this(baseUrl, sharedKeyCredentials, abfsConfiguration, @@ -335,22 +362,25 @@ AbfsThrottlingIntercept getIntercept() { * @return default request headers */ @VisibleForTesting - protected List createDefaultHeaders() { - return createDefaultHeaders(this.xMsVersion); - } + protected abstract List createDefaultHeaders(); /** * Create request headers for Rest Operation using the specified API version. - * @param xMsVersion + * @param xMsVersion Azure services API version to be used. * @return default request headers */ - private List createDefaultHeaders(ApiVersion xMsVersion) { + @VisibleForTesting + public abstract List createDefaultHeaders(ApiVersion xMsVersion); + + /** + * Create request headers common to both service endpoints. + * @param xMsVersion azure services API version to be used. + * @return common request headers + */ + protected List createCommonHeaders(ApiVersion xMsVersion) { final List requestHeaders = new ArrayList(); requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion.toString())); - requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON - + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM)); - requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, - UTF_8)); + requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, UTF_8)); requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING)); requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent)); return requestHeaders; @@ -372,8 +402,15 @@ private List createDefaultHeaders(ApiVersion xMsVersion) { *
  • getPathStatus for fs.setXAttr and fs.getXAttr
  • *
  • read
  • * + * @param path path of the file / directory to be created / overwritten. + * @param requestHeaders list of headers to be added to the request. + * @param isCreateFileRequest defines if file or directory has to be created / overwritten. + * @param contextEncryptionAdapter object that contains the encryptionContext and + * encryptionKey created from the developer provided implementation of {@link EncryptionContextProvider} + * @param tracingContext to trace service calls. + * @throws AzureBlobFileSystemException if namespace is not enabled. */ - private void addEncryptionKeyRequestHeaders(String path, + protected void addEncryptionKeyRequestHeaders(String path, List requestHeaders, boolean isCreateFileRequest, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) throws AzureBlobFileSystemException { @@ -405,106 +442,66 @@ private void addEncryptionKeyRequestHeaders(String path, SERVER_SIDE_ENCRYPTION_ALGORITHM)); } - AbfsUriQueryBuilder createDefaultUriQueryBuilder() { + /** + * Creates a AbfsUriQueryBuilder with default query parameter timeout. + * @return default AbfsUriQueryBuilder. + */ + protected AbfsUriQueryBuilder createDefaultUriQueryBuilder() { final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT); return abfsUriQueryBuilder; } - public AbfsRestOperation createFilesystem(TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); + /** + * Create a new filesystem using Azure REST API Service. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation createFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException; - final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + /** + * Sets user-defined metadata on filesystem. + * @param properties list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation setFilesystemProperties(Hashtable properties, + TracingContext tracingContext) throws AzureBlobFileSystemException; - final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.CreateFileSystem, - HTTP_METHOD_PUT, url, requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * List paths and their properties in the current filesystem. + * @param relativePath to return only blobs within this directory. + * @param recursive to return all blobs in the path, including those in subdirectories. + * @param listMaxResults maximum number of blobs to return. + * @param continuation marker to specify the continuation token. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation or response parsing fails. + */ + public abstract AbfsRestOperation listPath(String relativePath, boolean recursive, + int listMaxResults, String continuation, TracingContext tracingContext) + throws IOException; - public AbfsRestOperation setFilesystemProperties(final String properties, - TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - // JDK7 does not support PATCH, so to work around the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); + /** + * Retrieves user-defined metadata on filesystem. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + * */ + public abstract AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) + throws AzureBlobFileSystemException; - requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, - properties)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - - final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetFileSystemProperties, - HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults, - final String continuation, TracingContext tracingContext) - throws IOException { - final List requestHeaders = createDefaultHeaders(); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, getDirectoryQueryParameter(relativePath)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); - appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.ListPaths, - HTTP_METHOD_GET, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - - final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.GetFileSystemProperties, - HTTP_METHOD_HEAD, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - - final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.DeleteFileSystem, - HTTP_METHOD_DELETE, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * Deletes the filesystem using Azure REST API Service. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation deleteFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException; /** * Method for calling createPath API to the backend. Method can be called from: @@ -533,150 +530,57 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) throws * @throws AzureBlobFileSystemException throws back the exception it receives from the * {@link AbfsRestOperation#execute(TracingContext)} method call. */ - public AbfsRestOperation createPath(final String path, - final boolean isFile, - final boolean overwrite, - final Permissions permissions, - final boolean isAppendBlob, - final String eTag, - final ContextEncryptionAdapter contextEncryptionAdapter, - final TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - if (isFile) { - addEncryptionKeyRequestHeaders(path, requestHeaders, true, - contextEncryptionAdapter, tracingContext); - } - if (!overwrite) { - requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); - } + public abstract AbfsRestOperation createPath(String path, + boolean isFile, + boolean overwrite, + Permissions permissions, + boolean isAppendBlob, + String eTag, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException; - if (permissions.hasPermission()) { - requestHeaders.add( - new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, - permissions.getPermission())); - } + /** + * Acquire lease on specified path. + * @param path on which lease has to be acquired. + * @param duration for which lease has to be acquired. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation acquireLease(String path, int duration, + TracingContext tracingContext) throws AzureBlobFileSystemException; - if (permissions.hasUmask()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, - permissions.getUmask())); - } + /** + * Renew lease on specified path. + * @param path on which lease has to be renewed. + * @param leaseId of the lease to be renewed. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation renewLease(String path, String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException; - if (eTag != null && !eTag.isEmpty()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); - } + /** + * Release lease on specified path. + * @param path on which lease has to be released. + * @param leaseId of the lease to be released. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation releaseLease(String path, String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException; - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); - if (isAppendBlob) { - abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE); - } - - String operation = isFile - ? SASTokenProvider.CREATE_FILE_OPERATION - : SASTokenProvider.CREATE_DIRECTORY_OPERATION; - appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.CreatePath, - HTTP_METHOD_PUT, - url, - requestHeaders); - try { - op.execute(tracingContext); - } catch (AzureBlobFileSystemException ex) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw ex; - } - if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { - String existingResource = - op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE); - if (existingResource != null && existingResource.equals(DIRECTORY)) { - return op; //don't throw ex on mkdirs for existing directory - } - } - throw ex; - } - return op; - } - - public AbfsRestOperation acquireLease(final String path, int duration, TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); - requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString())); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.LeasePath, - HTTP_METHOD_POST, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation renewLease(final String path, final String leaseId, - TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.LeasePath, - HTTP_METHOD_POST, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation releaseLease(final String path, - final String leaseId, TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.LeasePath, - HTTP_METHOD_POST, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation breakLease(final String path, - TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.LeasePath, - HTTP_METHOD_POST, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * Break lease on specified path. + * @param path on which lease has to be broke. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation breakLease(String path, + TracingContext tracingContext) throws AzureBlobFileSystemException; /** * Rename a file or directory. @@ -700,127 +604,29 @@ public AbfsRestOperation breakLease(final String path, * AbfsRest operation, rename recovery and incomplete metadata state failure. * @throws AzureBlobFileSystemException failure, excluding any recovery from overload failures. */ - public AbfsClientRenameResult renamePath( - final String source, - final String destination, - final String continuation, - final TracingContext tracingContext, - String sourceEtag, - boolean isMetadataIncompleteState, - boolean isNamespaceEnabled) - throws IOException { - final List requestHeaders = createDefaultHeaders(); + public abstract AbfsClientRenameResult renamePath( + String source, + String destination, + String continuation, + TracingContext tracingContext, + String sourceEtag, + boolean isMetadataIncompleteState, + boolean isNamespaceEnabled) + throws IOException; - final boolean hasEtag = !isEmpty(sourceEtag); - - boolean shouldAttemptRecovery = renameResilience && isNamespaceEnabled; - if (!hasEtag && shouldAttemptRecovery) { - // in case eTag is already not supplied to the API - // and rename resilience is expected and it is an HNS enabled account - // fetch the source etag to be used later in recovery - try { - final AbfsRestOperation srcStatusOp = getPathStatus(source, - false, tracingContext, null); - if (srcStatusOp.hasResult()) { - final AbfsHttpOperation result = srcStatusOp.getResult(); - sourceEtag = extractEtagHeader(result); - // and update the directory status. - boolean isDir = checkIsDir(result); - shouldAttemptRecovery = !isDir; - LOG.debug("Retrieved etag of source for rename recovery: {}; isDir={}", sourceEtag, isDir); - } - } catch (AbfsRestOperationException e) { - throw new AbfsRestOperationException(e.getStatusCode(), SOURCE_PATH_NOT_FOUND.getErrorCode(), - e.getMessage(), e); - } - - } - - String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); - if (authType == AuthType.SAS) { - final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); - appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder); - encodedRenameSource += srcQueryBuilder.toString(); - } - - LOG.trace("Rename source queryparam added {}", encodedRenameSource); - requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); - requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); - appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); - try { - incrementAbfsRenamePath(); - op.execute(tracingContext); - // AbfsClientResult contains the AbfsOperation, If recovery happened or - // not, and the incompleteMetaDataState is true or false. - // If we successfully rename a path and isMetadataIncompleteState was - // true, then rename was recovered, else it didn't, this is why - // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). - return new AbfsClientRenameResult(op, isMetadataIncompleteState, isMetadataIncompleteState); - } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - - // ref: HADOOP-18242. Rename failure occurring due to a rare case of - // tracking metadata being in incomplete state. - if (op.getResult().getStorageErrorCode() - .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) - && !isMetadataIncompleteState) { - //Logging - ABFS_METADATA_INCOMPLETE_RENAME_FAILURE - .info("Rename Failure attempting to resolve tracking metadata state and retrying."); - // rename recovery should be attempted in this case also - shouldAttemptRecovery = true; - isMetadataIncompleteState = true; - String sourceEtagAfterFailure = sourceEtag; - if (isEmpty(sourceEtagAfterFailure)) { - // Doing a HEAD call resolves the incomplete metadata state and - // then we can retry the rename operation. - AbfsRestOperation sourceStatusOp = getPathStatus(source, false, - tracingContext, null); - isMetadataIncompleteState = true; - // Extract the sourceEtag, using the status Op, and set it - // for future rename recovery. - AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); - sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); - } - renamePath(source, destination, continuation, tracingContext, - sourceEtagAfterFailure, isMetadataIncompleteState, isNamespaceEnabled); - } - // if we get out of the condition without a successful rename, then - // it isn't metadata incomplete state issue. - isMetadataIncompleteState = false; - - // setting default rename recovery success to false - boolean etagCheckSucceeded = false; - if (shouldAttemptRecovery) { - etagCheckSucceeded = renameIdempotencyCheckOp( - source, - sourceEtag, op, destination, tracingContext); - } - if (!etagCheckSucceeded) { - // idempotency did not return different result - // throw back the exception - throw e; - } - return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); - } - } - - private boolean checkIsDir(AbfsHttpOperation result) { - String resourceType = result.getResponseHeader( - HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - return resourceType != null - && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); - } + /** + * Checks if the rest operation results indicate if the path is a directory. + * @param result executed rest operation containing response from server. + * @return True if the path is a directory, False otherwise. + */ + protected abstract boolean checkIsDir(AbfsHttpOperation result); + /** + * Creates a rest operation for rename. + * @param url to be used for the operation. + * @param requestHeaders list of headers to be added to the request. + * @return un-executed rest operation. + */ @VisibleForTesting AbfsRestOperation createRenameRestOperation(URL url, List requestHeaders) { AbfsRestOperation op = getAbfsRestOperation( @@ -831,7 +637,11 @@ AbfsRestOperation createRenameRestOperation(URL url, List reques return op; } - private void incrementAbfsRenamePath() { + /** + * Increments AbfsCounters for rename path attempts by 1. + * Will be called each time a rename path operation is attempted. + */ + protected void incrementAbfsRenamePath() { abfsCounters.incrementCounter(RENAME_PATH_ATTEMPTS, 1); } @@ -896,142 +706,35 @@ public boolean renameIdempotencyCheckOp( return false; } - @VisibleForTesting - boolean isSourceDestEtagEqual(String sourceEtag, AbfsHttpOperation result) { - return sourceEtag.equals(extractEtagHeader(result)); - } - - public AbfsRestOperation append(final String path, final byte[] buffer, - AppendRequestParameters reqParams, final String cachedSasToken, + /** + * Uploads data to be appended to a file. + * @param path to which data has to be appended. + * @param buffer containing data to be appended. + * @param reqParams containing parameters for append operation like offset, length etc. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation append(String path, byte[] buffer, + AppendRequestParameters reqParams, String cachedSasToken, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - if (reqParams.isExpectHeaderEnabled()) { - requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); - } - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - if (reqParams.getLeaseId() != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); - } - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(reqParams.getPosition())); - - if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || ( - reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) { - abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE); - if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE); - } - } - - // Check if the retry is with "Expect: 100-continue" header being present in the previous request. - if (reqParams.isRetryDueToExpect()) { - String userAgentRetry = userAgent; - // Remove the specific marker related to "Expect: 100-continue" from the User-Agent string. - userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT, EMPTY_STRING); - requestHeaders.removeIf(header -> header.getName().equalsIgnoreCase(USER_AGENT)); - requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry)); - } - - // Add MD5 Hash of request content as request header if feature is enabled - if (isChecksumValidationEnabled()) { - addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); - } - - // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance - String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, - abfsUriQueryBuilder, cachedSasToken); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.Append, - HTTP_METHOD_PUT, - url, - requestHeaders, - buffer, - reqParams.getoffset(), - reqParams.getLength(), - sasTokenForReuse); - try { - op.execute(tracingContext); - } catch (AbfsRestOperationException e) { - /* - If the http response code indicates a user error we retry - the same append request with expect header being disabled. - When "100-continue" header is enabled but a non Http 100 response comes, - the response message might not get set correctly by the server. - So, this handling is to avoid breaking of backward compatibility - if someone has taken dependency on the exception message, - which is created using the error string present in the response header. - */ - int responseStatusCode = e.getStatusCode(); - if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) { - LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path); - reqParams.setExpectHeaderEnabled(false); - reqParams.setRetryDueToExpect(true); - return this.append(path, buffer, reqParams, cachedSasToken, - contextEncryptionAdapter, tracingContext); - } - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - - if (isMd5ChecksumError(e)) { - throw new AbfsInvalidChecksumException(e); - } - - if (reqParams.isAppendBlob() - && appendSuccessCheckOp(op, path, - (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { - final AbfsRestOperation successOp = getAbfsRestOperation( - AbfsRestOperationType.Append, - HTTP_METHOD_PUT, - url, - requestHeaders, - buffer, - reqParams.getoffset(), - reqParams.getLength(), - sasTokenForReuse); - successOp.hardSetResult(HttpURLConnection.HTTP_OK); - return successOp; - } - throw e; - } - - catch (AzureBlobFileSystemException e) { - // Any server side issue will be returned as AbfsRestOperationException and will be handled above. - LOG.debug("Append request failed with non server issues for path: {}, offset: {}, position: {}", - path, reqParams.getoffset(), reqParams.getPosition()); - throw e; - } - - return op; - } + throws AzureBlobFileSystemException; /** * Returns true if the status code lies in the range of user error. * @param responseStatusCode http response status code. * @return True or False. */ - private boolean checkUserError(int responseStatusCode) { - return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST - && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR); - } + public abstract boolean checkUserError(int responseStatusCode); /** * To check if the failure exception returned by server is due to MD5 Mismatch * @param e Exception returned by AbfsRestOperation * @return boolean whether exception is due to MD5Mismatch or not */ - private boolean isMd5ChecksumError(final AbfsRestOperationException e) { + protected boolean isMd5ChecksumError(final AbfsRestOperationException e) { AzureServiceErrorCode storageErrorCode = e.getErrorCode(); return storageErrorCode == AzureServiceErrorCode.MD5_MISMATCH; } @@ -1040,7 +743,7 @@ private boolean isMd5ChecksumError(final AbfsRestOperationException e) { // However a retry would fail with an InvalidQueryParameterValue // (as the current offset would be unacceptable). // Hence, we pass/succeed the appendblob append call - // in case we are doing a retry after checking the length of the file + // in case we are doing a retry after checking the length of the file. public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, final long length, TracingContext tracingContext) throws AzureBlobFileSystemException { @@ -1059,203 +762,111 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, return false; } - public AbfsRestOperation flush(final String path, final long position, + /** + * Flush previously uploaded data to a file. + * @param path on which data has to be flushed. + * @param position to which data has to be flushed. + * @param retainUncommittedData whether to retain uncommitted data after flush. + * @param isClose specify if this is the last flush to the file. + * @param cachedSasToken to be used for the authenticating operation. + * @param leaseId if there is an active lease on the path. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation flush(String path, long position, boolean retainUncommittedData, boolean isClose, - final String cachedSasToken, final String leaseId, + String cachedSasToken, String leaseId, ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - if (leaseId != null) { - requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); - } + throws AzureBlobFileSystemException; - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); + /** + * Flush previously uploaded data to a file. + * @param buffer containing blockIds to be flushed. + * @param path on which data has to be flushed. + * @param isClose specify if this is the last flush to the file. + * @param cachedSasToken to be used for the authenticating operation. + * @param leaseId if there is an active lease on the path. + * @param eTag to specify conditional headers. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation flush(byte[] buffer, + String path, + boolean isClose, + String cachedSasToken, + String leaseId, + String eTag, + TracingContext tracingContext) throws AzureBlobFileSystemException; - // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance - String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, - abfsUriQueryBuilder, cachedSasToken); + /** + * Set the properties of a file or directory. + * @param path on which properties have to be set. + * @param properties list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation setPathProperties(String path, Hashtable properties, + TracingContext tracingContext, ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException; - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.Flush, - HTTP_METHOD_PUT, - url, - requestHeaders, sasTokenForReuse); - op.execute(tracingContext); - return op; - } + /** + * Get the properties of a file or directory. + * @param path of which properties have to be fetched. + * @param includeProperties to include user defined properties. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation getPathStatus(String path, + boolean includeProperties, TracingContext tracingContext, + ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException; - public AbfsRestOperation setPathProperties(final String path, final String properties, - final TracingContext tracingContext, final ContextEncryptionAdapter contextEncryptionAdapter) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - - requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); - appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetPathProperties, - HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation getPathStatus(final String path, - final boolean includeProperties, final TracingContext tracingContext, - final ContextEncryptionAdapter contextEncryptionAdapter) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; - if (!includeProperties) { - // The default action (operation) is implicitly to get properties and this action requires read permission - // because it reads user defined properties. If the action is getStatus or getAclStatus, then - // only traversal (execute) permission is required. - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS); - operation = SASTokenProvider.GET_STATUS_OPERATION; - } else { - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, - tracingContext); - } - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); - appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.GetPathStatus, - HTTP_METHOD_HEAD, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation read(final String path, - final long position, - final byte[] buffer, - final int bufferOffset, - final int bufferLength, - final String eTag, + /** + * Read the contents of the file at specified path. + * @param path of the file to be read. + * @param position in the file from where data has to be read. + * @param buffer to store the data read. + * @param bufferOffset offset in the buffer to start storing the data. + * @param bufferLength length of data to be read. + * @param eTag to specify conditional headers. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation read(String path, + long position, + byte[] buffer, + int bufferOffset, + int bufferLength, + String eTag, String cachedSasToken, ContextEncryptionAdapter contextEncryptionAdapter, - TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - addEncryptionKeyRequestHeaders(path, requestHeaders, false, - contextEncryptionAdapter, tracingContext); - AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, - String.format("bytes=%d-%d", position, position + bufferLength - 1)); - requestHeaders.add(rangeHeader); - requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + TracingContext tracingContext) throws AzureBlobFileSystemException; - // Add request header to fetch MD5 Hash of data returned by server. - if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { - requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); - } - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance - String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, - abfsUriQueryBuilder, cachedSasToken); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.ReadFile, - HTTP_METHOD_GET, - url, - requestHeaders, - buffer, - bufferOffset, - bufferLength, sasTokenForReuse); - op.execute(tracingContext); - - // Verify the MD5 hash returned by server holds valid on the data received. - if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { - verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); - } - - return op; - } - - public AbfsRestOperation deletePath(final String path, final boolean recursive, - final String continuation, - TracingContext tracingContext, - final boolean isNamespaceEnabled) - throws AzureBlobFileSystemException { - /* - * If Pagination is enabled and current API version is old, - * use the minimum required version for pagination. - * If Pagination is enabled and current API version is later than minimum required - * version for pagination, use current version only as azure service is backward compatible. - * If pagination is disabled, use the current API version only. - */ - final List requestHeaders = (isPaginatedDelete(recursive, - isNamespaceEnabled) && xMsVersion.compareTo(ApiVersion.AUG_03_2023) < 0) - ? createDefaultHeaders(ApiVersion.AUG_03_2023) - : createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - - if (isPaginatedDelete(recursive, isNamespaceEnabled)) { - // Add paginated query parameter - abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE); - } - - abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); - String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION; - appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = new AbfsRestOperation( - AbfsRestOperationType.DeletePath, - this, - HTTP_METHOD_DELETE, - url, - requestHeaders, - abfsConfiguration); - try { - op.execute(tracingContext); - } catch (AzureBlobFileSystemException e) { - // If we have no HTTP response, throw the original exception. - if (!op.hasResult()) { - throw e; - } - final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op); - if (idempotencyOp.getResult().getStatusCode() - == op.getResult().getStatusCode()) { - // idempotency did not return different result - // throw back the exception - throw e; - } else { - return idempotencyOp; - } - } - - return op; - } + /** + * Delete the file or directory at specified path. + * @param path to be deleted. + * @param recursive if the path is a directory, delete recursively. + * @param continuation to specify continuation token. + * @param tracingContext for tracing the server calls. + * @param isNamespaceEnabled specify if the namespace is enabled. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation deletePath(String path, boolean recursive, + String continuation, + TracingContext tracingContext, + boolean isNamespaceEnabled) + throws AzureBlobFileSystemException; /** * Check if the delete request failure is post a retry and if delete failure @@ -1269,8 +880,8 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive, * delete issued from this filesystem instance. * These are few corner cases and usually returning a success at this stage * should help the job to continue. - * @param op Delete request REST operation response with non-null HTTP response - * @return REST operation response post idempotency check + * @param op Delete request REST operation response with non-null HTTP response. + * @return REST operation response post idempotency check. */ public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) { Preconditions.checkArgument(op.hasResult(), "Operations has null HTTP response"); @@ -1292,117 +903,79 @@ public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) { return op; } - public AbfsRestOperation setOwner(final String path, final String owner, final String group, - TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); + /** + * Sets the owner on tha path. + * @param path on which owner has to be set. + * @param owner to be set. + * @param group to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation setOwner(String path, String owner, String group, + TracingContext tracingContext) + throws AzureBlobFileSystemException; - if (owner != null && !owner.isEmpty()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner)); - } - if (group != null && !group.isEmpty()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group)); - } - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetOwner, - AbfsHttpConstants.HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } - - public AbfsRestOperation setPermission(final String path, final String permission, - TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission)); - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetPermissions, - AbfsHttpConstants.HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * Sets the permission on the path. + * @param path on which permission has to be set. + * @param permission to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation setPermission(String path, String permission, + TracingContext tracingContext) + throws AzureBlobFileSystemException; + /** + * Sets the ACL. + * @param path on which ACL has to be set. + * @param aclSpecString to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ public AbfsRestOperation setAcl(final String path, final String aclSpecString, - TracingContext tracingContext) throws AzureBlobFileSystemException { - return setAcl(path, aclSpecString, AbfsHttpConstants.EMPTY_STRING, tracingContext); + TracingContext tracingContext) throws AzureBlobFileSystemException { + return setAcl(path, aclSpecString, EMPTY_STRING, tracingContext); } - public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag, - TracingContext tracingContext) - throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - // JDK7 does not support PATCH, so to workaround the issue we will use - // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, - HTTP_METHOD_PATCH)); - - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString)); - - if (eTag != null && !eTag.isEmpty()) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); - } - - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.SetAcl, - AbfsHttpConstants.HTTP_METHOD_PUT, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * Sets the ACL on the path that matches ETag. + * @param path on which ACL has to be set. + * @param aclSpecString to be set. + * @param eTag to specify conditional headers. Set only if etag matches. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation setAcl(String path, String aclSpecString, String eTag, + TracingContext tracingContext) + throws AzureBlobFileSystemException; + /** + * Retrieves the ACL properties of blob at specified path. + * @param path of which properties have to be fetched. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ public AbfsRestOperation getAclStatus(final String path, TracingContext tracingContext) throws AzureBlobFileSystemException { return getAclStatus(path, abfsConfiguration.isUpnUsed(), tracingContext); } - public AbfsRestOperation getAclStatus(final String path, final boolean useUPN, - TracingContext tracingContext) throws AzureBlobFileSystemException { - final List requestHeaders = createDefaultHeaders(); - final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN)); - appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, abfsUriQueryBuilder); - - final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - final AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.GetAcl, - AbfsHttpConstants.HTTP_METHOD_HEAD, - url, - requestHeaders); - op.execute(tracingContext); - return op; - } + /** + * Retrieves the ACL properties of blob at specified path. + * @param path of which properties have to be fetched. + * @param useUPN whether to use UPN with rest operation. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract AbfsRestOperation getAclStatus(String path, boolean useUPN, + TracingContext tracingContext) throws AzureBlobFileSystemException; /** * Talks to the server to check whether the permission specified in @@ -1414,21 +987,8 @@ public AbfsRestOperation getAclStatus(final String path, final boolean useUPN, * @return The {@link AbfsRestOperation} object for the operation * @throws AzureBlobFileSystemException in case of bad requests */ - public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tracingContext) - throws AzureBlobFileSystemException { - AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS); - abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx); - appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder); - URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); - AbfsRestOperation op = getAbfsRestOperation( - AbfsRestOperationType.CheckAccess, - AbfsHttpConstants.HTTP_METHOD_HEAD, - url, - createDefaultHeaders()); - op.execute(tracingContext); - return op; - } + public abstract AbfsRestOperation checkAccess(String path, String rwx, TracingContext tracingContext) + throws AzureBlobFileSystemException; /** * Get the directory query parameter used by the List Paths REST API and used @@ -1442,7 +1002,7 @@ public AbfsRestOperation checkAccess(String path, String rwx, TracingContext tra public static String getDirectoryQueryParameter(final String path) { String directory = path; if (Strings.isNullOrEmpty(directory)) { - directory = AbfsHttpConstants.EMPTY_STRING; + directory = EMPTY_STRING; } else if (directory.charAt(0) == '/') { directory = directory.substring(1); } @@ -1451,29 +1011,29 @@ public static String getDirectoryQueryParameter(final String path) { /** * If configured for SAS AuthType, appends SAS token to queryBuilder. - * @param path - * @param operation - * @param queryBuilder + * @param path for which SAS token is required. + * @param operation for which SAS token is required. + * @param queryBuilder to which SAS token is appended. * @return sasToken - returned for optional re-use. - * @throws SASTokenProviderException + * @throws SASTokenProviderException if SAS token cannot be acquired. */ - private String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException { + protected String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException { return appendSASTokenToQuery(path, operation, queryBuilder, null); } /** * If configured for SAS AuthType, appends SAS token to queryBuilder. - * @param path - * @param operation - * @param queryBuilder + * @param path for which SAS token is required. + * @param operation for which SAS token is required. + * @param queryBuilder to which SAS token is appended. * @param cachedSasToken - previously acquired SAS token to be reused. * @return sasToken - returned for optional re-use. - * @throws SASTokenProviderException + * @throws SASTokenProviderException if SAS token cannot be acquired. */ - private String appendSASTokenToQuery(String path, - String operation, - AbfsUriQueryBuilder queryBuilder, - String cachedSasToken) + protected String appendSASTokenToQuery(String path, + String operation, + AbfsUriQueryBuilder queryBuilder, + String cachedSasToken) throws SASTokenProviderException { String sasToken = null; if (this.authType == AuthType.SAS) { @@ -1506,17 +1066,38 @@ private String appendSASTokenToQuery(String path, return sasToken; } + /** + * Creates REST operation URL with empty path for the given query. + * @param query to be added to the URL. + * @return URL for the REST operation. + * @throws AzureBlobFileSystemException if URL creation fails. + */ @VisibleForTesting - private URL createRequestUrl(final String query) throws AzureBlobFileSystemException { + protected URL createRequestUrl(final String query) throws AzureBlobFileSystemException { return createRequestUrl(EMPTY_STRING, query); } + /** + * Creates REST operation URL with given path and query. + * @param path for which URL has to be created. + * @param query to be added to the URL. + * @return URL for the REST operation. + * @throws AzureBlobFileSystemException if URL creation fails. + */ @VisibleForTesting protected URL createRequestUrl(final String path, final String query) throws AzureBlobFileSystemException { return createRequestUrl(baseUrl, path, query); } + /** + * Creates REST operation URL with given baseUrl, path and query. + * @param baseUrl to be used for the operation. + * @param path for which URL has to be created. + * @param query to be added to the URL. + * @return URL for the REST operation. + * @throws AzureBlobFileSystemException if URL creation fails. + */ @VisibleForTesting protected URL createRequestUrl(final URL baseUrl, final String path, final String query) throws AzureBlobFileSystemException { @@ -1545,6 +1126,12 @@ protected URL createRequestUrl(final URL baseUrl, final String path, final Strin return url; } + /** + * returns the url encoded string for a given value. + * @param value to be encoded. + * @return url encoded string. + * @throws AzureBlobFileSystemException if encoding fails. + */ public static String urlEncode(final String value) throws AzureBlobFileSystemException { String encodedString; try { @@ -1570,7 +1157,7 @@ protected Boolean getIsPaginatedDeleteEnabled() { return abfsConfiguration.isPaginatedDeleteEnabled(); } - private Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) { + protected Boolean isPaginatedDelete(boolean isRecursiveDelete, boolean isNamespaceEnabled) { return getIsPaginatedDeleteEnabled() && isNamespaceEnabled && isRecursiveDelete; } @@ -1656,7 +1243,7 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx, * @param buffer for getting input data for MD5 computation * @throws AbfsRestOperationException if Md5 computation fails */ - private void addCheckSumHeaderForWrite(List requestHeaders, + protected void addCheckSumHeaderForWrite(List requestHeaders, final AppendRequestParameters reqParams, final byte[] buffer) throws AbfsRestOperationException { String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(), @@ -1671,7 +1258,7 @@ private void addCheckSumHeaderForWrite(List requestHeaders, * @param bufferOffset Position where data returned by server is saved in buffer. * @throws AbfsRestOperationException if Md5Mismatch. */ - private void verifyCheckSumForRead(final byte[] buffer, + protected void verifyCheckSumForRead(final byte[] buffer, final AbfsHttpOperation result, final int bufferOffset) throws AbfsRestOperationException { // Number of bytes returned by server could be less than or equal to what @@ -1694,9 +1281,8 @@ private void verifyCheckSumForRead(final byte[] buffer, /** * Conditions check for allowing checksum support for read operation. - * Sending MD5 Hash in request headers. For more details see - * @see - * Path - Read Azure Storage Rest API. + * Sending MD5 Hash in request headers. For more details refer to + * Path - Read Azure Storage Rest API. * 1. Range header must be present as one of the request headers. * 2. buffer length must be less than or equal to 4 MB. * @param requestHeaders to be checked for range header. @@ -1704,7 +1290,7 @@ private void verifyCheckSumForRead(final byte[] buffer, * @param bufferLength must be less than or equal to 4 MB. * @return true if all conditions are met. */ - private boolean isChecksumValidationEnabled(List requestHeaders, + protected boolean isChecksumValidationEnabled(List requestHeaders, final AbfsHttpHeader rangeHeader, final int bufferLength) { return getAbfsConfiguration().getIsChecksumValidationEnabled() && requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB; @@ -1713,12 +1299,11 @@ private boolean isChecksumValidationEnabled(List requestHeaders, /** * Conditions check for allowing checksum support for write operation. * Server will support this if client sends the MD5 Hash as a request header. - * For azure stoage service documentation see - * @see - * Path - Update Azure Rest API. + * For azure stoage service documentation and more details refer to + * Path - Update Azure Rest API. * @return true if checksum validation enabled. */ - private boolean isChecksumValidationEnabled() { + protected boolean isChecksumValidationEnabled() { return getAbfsConfiguration().getIsChecksumValidationEnabled(); } @@ -2011,4 +1596,12 @@ AbfsApacheHttpClient getAbfsApacheHttpClient() { KeepAliveCache getKeepAliveCache() { return keepAliveCache; } + + protected String getUserAgent() { + return userAgent; + } + + protected boolean isRenameResilience() { + return renameResilience; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java new file mode 100644 index 0000000000..12d800939a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.net.URL; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; + +import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs; + +/** + * AbfsClientHandler is a class that provides a way to get the AbfsClient + * based on the service type. + */ +public class AbfsClientHandler { + public static final Logger LOG = LoggerFactory.getLogger(AbfsClientHandler.class); + + private AbfsServiceType defaultServiceType; + private final AbfsDfsClient dfsAbfsClient; + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, tokenProvider, null, encryptionContextProvider, + abfsClientContext); + initServiceType(abfsConfiguration); + } + + public AbfsClientHandler(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials, + abfsConfiguration, null, sasTokenProvider, encryptionContextProvider, + abfsClientContext); + initServiceType(abfsConfiguration); + } + + /** + * Initialize the default service type based on the user configuration. + * @param abfsConfiguration set by user. + */ + private void initServiceType(final AbfsConfiguration abfsConfiguration) { + this.defaultServiceType = abfsConfiguration.getFsConfiguredServiceType(); + } + + /** + * Get the AbfsClient based on the default service type. + * @return AbfsClient + */ + public AbfsClient getClient() { + return getClient(defaultServiceType); + } + + /** + * Get the AbfsClient based on the service type. + * @param serviceType AbfsServiceType + * @return AbfsClient + */ + public AbfsClient getClient(AbfsServiceType serviceType) { + return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null; + } + + /** + * Create the AbfsDfsClient using the url used to configure file system. + * If URL is for Blob endpoint, it will be converted to DFS endpoint. + * @param baseUrl URL + * @param creds SharedKeyCredentials + * @param abfsConfiguration AbfsConfiguration + * @param tokenProvider AccessTokenProvider + * @param sasTokenProvider SASTokenProvider + * @param encryptionContextProvider EncryptionContextProvider + * @param abfsClientContext AbfsClientContext + * @return AbfsDfsClient with DFS endpoint URL + * @throws IOException if URL conversion fails. + */ + private AbfsDfsClient createDfsClient(final URL baseUrl, + final SharedKeyCredentials creds, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + URL dfsUrl = changeUrlFromBlobToDfs(baseUrl); + if (tokenProvider != null) { + LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl); + return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration, + tokenProvider, encryptionContextProvider, + abfsClientContext); + } else { + LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl); + return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration, + sasTokenProvider, encryptionContextProvider, + abfsClientContext); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java new file mode 100644 index 0000000000..f2eebd8800 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -0,0 +1,1302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

    + * http://www.apache.org/licenses/LICENSE-2.0 + *

    + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.utils.Base64; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.util.StringUtils; + +import static org.apache.commons.lang3.StringUtils.isEmpty; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOB_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_JSON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_OCTET_STREAM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BREAK_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHECK_ACCESS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DEFAULT_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILESYSTEM; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FLUSH_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_ACCESS_CONTROL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.GET_STATUS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_DELETE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_HEAD; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_POST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RELEASE_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.RENEW_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_ACCESS_CONTROL; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SET_PROPERTIES_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_BREAK_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_DURATION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPERTIES; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_PROPOSED_LEASE_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RANGE_GET_CONTENT_MD5; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RENAME_SOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_FS_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_BLOBTYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CONTINUATION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_FLUSH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAXRESULTS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PAGINATED; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_POSITION; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RECURSIVE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; + +/** + * AbfsClient interacting with the DFS Endpoint. + */ +public class AbfsDfsClient extends AbfsClient { + + public AbfsDfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final AccessTokenProvider tokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider, + encryptionContextProvider, abfsClientContext); + } + + public AbfsDfsClient(final URL baseUrl, + final SharedKeyCredentials sharedKeyCredentials, + final AbfsConfiguration abfsConfiguration, + final SASTokenProvider sasTokenProvider, + final EncryptionContextProvider encryptionContextProvider, + final AbfsClientContext abfsClientContext) throws IOException { + super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider, + encryptionContextProvider, abfsClientContext); + } + + /** + * Create request headers for Rest Operation using the default API version. + * @return default request headers. + */ + @Override + public List createDefaultHeaders() { + return this.createDefaultHeaders(getxMsVersion()); + } + + /** + * Create request headers for Rest Operation using the specified API version. + * DFS Endpoint API responses are in JSON/Stream format. + * @param xMsVersion API version to be used. + * @return default request headers. + */ + @Override + public List createDefaultHeaders(ApiVersion xMsVersion) { + List requestHeaders = createCommonHeaders(xMsVersion); + requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON + + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM)); + return requestHeaders; + } + + /** + * Get Rest Operation for API + * + * Filesystem - Create. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreateFileSystem, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Filesystem - Set Properties. + * @param properties list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setFilesystemProperties(final Hashtable properties, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + final List requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to work around the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, commaSeparatedProperties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetFileSystemProperties, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Filesystem - Get Properties. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + * */ + @Override + public AbfsRestOperation getFilesystemProperties(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetFileSystemProperties, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Filesystem - Delete. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.DeleteFileSystem, + HTTP_METHOD_DELETE, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Filesystem - List. + * List paths and their properties in the current filesystem. + * @param relativePath to return only blobs within this directory. + * @param recursive to return all blobs in the path, including those in subdirectories. + * @param listMaxResults maximum number of blobs to return. + * @param continuation marker to specify the continuation token. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation or response parsing fails. + */ + @Override + public AbfsRestOperation listPath(final String relativePath, + final boolean recursive, + final int listMaxResults, + final String continuation, + TracingContext tracingContext) throws IOException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, + getDirectoryQueryParameter(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, + String.valueOf(listMaxResults)); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(getAbfsConfiguration().isUpnUsed())); + appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ListPaths, + HTTP_METHOD_GET, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Create. + * Create a path (file or directory) in the current filesystem. + * @param path to be created inside the filesystem. + * @param isFile to specify if the created path is file or directory. + * @param overwrite to specify if the path should be overwritten if it already exists. + * @param permissions to specify the permissions of the path. + * @param isAppendBlob to specify if the path to be created is an append blob. + * @param eTag to specify conditional headers. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation createPath(final String path, + final boolean isFile, + final boolean overwrite, + final AzureBlobFileSystemStore.Permissions permissions, + final boolean isAppendBlob, + final String eTag, + final ContextEncryptionAdapter contextEncryptionAdapter, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + if (isFile) { + addEncryptionKeyRequestHeaders(path, requestHeaders, true, + contextEncryptionAdapter, tracingContext); + } + if (!overwrite) { + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + } + + if (permissions.hasPermission()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, + permissions.getPermission())); + } + + if (permissions.hasUmask()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, + permissions.getUmask())); + } + + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); + if (isAppendBlob) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE); + } + + String operation = isFile + ? SASTokenProvider.CREATE_FILE_OPERATION + : SASTokenProvider.CREATE_DIRECTORY_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CreatePath, + HTTP_METHOD_PUT, url, requestHeaders); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException ex) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw ex; + } + if (!isFile && op.getResult().getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + String existingResource = + op.getResult().getResponseHeader(X_MS_EXISTING_RESOURCE_TYPE); + if (existingResource != null && existingResource.equals(DIRECTORY)) { + return op; //don't throw ex on mkdirs for existing directory + } + } + throw ex; + } + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Lease. + * Acquire lease on specified path. + * @param path on which lease has to be acquired. + * @param duration for which lease has to be acquired. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation acquireLease(final String path, final int duration, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration))); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, + UUID.randomUUID().toString())); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Lease. + * Renew lease on specified path. + * @param path on which lease has to be renewed. + * @param leaseId of the lease to be renewed. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation renewLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Lease. + * Release lease on specified path. + * @param path on which lease has to be released. + * @param leaseId of the lease to be released. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation releaseLease(final String path, final String leaseId, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Lease. + * Break lease on specified path. + * @param path on which lease has to be broke. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation breakLease(final String path, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION)); + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, + DEFAULT_LEASE_BREAK_PERIOD)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.LeasePath, + HTTP_METHOD_POST, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Create. + * @param source path to source file + * @param destination destination of rename. + * @param continuation continuation. + * @param tracingContext for tracing the server calls. + * @param sourceEtag etag of source file. may be null or empty + * @param isMetadataIncompleteState was there a rename failure due to incomplete metadata state? + * @param isNamespaceEnabled whether namespace enabled account or not + * @return executed rest operation containing response from server. + * @throws IOException if rest operation fails. + */ + @Override + public AbfsClientRenameResult renamePath( + final String source, + final String destination, + final String continuation, + final TracingContext tracingContext, + String sourceEtag, + boolean isMetadataIncompleteState, + boolean isNamespaceEnabled) throws IOException { + final List requestHeaders = createDefaultHeaders(); + + final boolean hasEtag = !isEmpty(sourceEtag); + + boolean shouldAttemptRecovery = isRenameResilience() && isNamespaceEnabled; + if (!hasEtag && shouldAttemptRecovery) { + // in case eTag is already not supplied to the API + // and rename resilience is expected and it is an HNS enabled account + // fetch the source etag to be used later in recovery + try { + final AbfsRestOperation srcStatusOp = getPathStatus(source, + false, tracingContext, null); + if (srcStatusOp.hasResult()) { + final AbfsHttpOperation result = srcStatusOp.getResult(); + sourceEtag = extractEtagHeader(result); + // and update the directory status. + boolean isDir = checkIsDir(result); + shouldAttemptRecovery = !isDir; + LOG.debug( + "Retrieved etag of source for rename recovery: {}; isDir={}", + sourceEtag, isDir); + } + } catch (AbfsRestOperationException e) { + throw new AbfsRestOperationException(e.getStatusCode(), + SOURCE_PATH_NOT_FOUND.getErrorCode(), + e.getMessage(), e); + } + + } + + String encodedRenameSource = urlEncode( + FORWARD_SLASH + this.getFileSystem() + source); + if (getAuthType() == AuthType.SAS) { + final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder(); + appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, + srcQueryBuilder); + encodedRenameSource += srcQueryBuilder.toString(); + } + + LOG.trace("Rename source queryparam added {}", encodedRenameSource); + requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + appendSASTokenToQuery(destination, + SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder); + + final URL url = createRequestUrl(destination, + abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = createRenameRestOperation(url, requestHeaders); + try { + incrementAbfsRenamePath(); + op.execute(tracingContext); + // AbfsClientResult contains the AbfsOperation, If recovery happened or + // not, and the incompleteMetaDataState is true or false. + // If we successfully rename a path and isMetadataIncompleteState was + // true, then rename was recovered, else it didn't, this is why + // isMetadataIncompleteState is used for renameRecovery(as the 2nd param). + return new AbfsClientRenameResult(op, isMetadataIncompleteState, + isMetadataIncompleteState); + } catch (AzureBlobFileSystemException e) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + + // ref: HADOOP-18242. Rename failure occurring due to a rare case of + // tracking metadata being in incomplete state. + if (op.getResult().getStorageErrorCode() + .equals(RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode()) + && !isMetadataIncompleteState) { + //Logging + ABFS_METADATA_INCOMPLETE_RENAME_FAILURE + .info( + "Rename Failure attempting to resolve tracking metadata state and retrying."); + // rename recovery should be attempted in this case also + shouldAttemptRecovery = true; + isMetadataIncompleteState = true; + String sourceEtagAfterFailure = sourceEtag; + if (isEmpty(sourceEtagAfterFailure)) { + // Doing a HEAD call resolves the incomplete metadata state and + // then we can retry the rename operation. + AbfsRestOperation sourceStatusOp = getPathStatus(source, false, + tracingContext, null); + isMetadataIncompleteState = true; + // Extract the sourceEtag, using the status Op, and set it + // for future rename recovery. + AbfsHttpOperation sourceStatusResult = sourceStatusOp.getResult(); + sourceEtagAfterFailure = extractEtagHeader(sourceStatusResult); + } + renamePath(source, destination, continuation, tracingContext, + sourceEtagAfterFailure, isMetadataIncompleteState, + isNamespaceEnabled); + } + // if we get out of the condition without a successful rename, then + // it isn't metadata incomplete state issue. + isMetadataIncompleteState = false; + + // setting default rename recovery success to false + boolean etagCheckSucceeded = false; + if (shouldAttemptRecovery) { + etagCheckSucceeded = renameIdempotencyCheckOp( + source, + sourceEtag, op, destination, tracingContext); + } + if (!etagCheckSucceeded) { + // idempotency did not return different result + // throw back the exception + throw e; + } + return new AbfsClientRenameResult(op, true, isMetadataIncompleteState); + } + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * Uploads data to be appended to a file. + * @param path to which data has to be appended. + * @param buffer containing data to be appended. + * @param reqParams containing parameters for append operation like offset, length etc. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation append(final String path, + final byte[] buffer, + AppendRequestParameters reqParams, + final String cachedSasToken, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + if (reqParams.isExpectHeaderEnabled()) { + requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); + } + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (reqParams.getLeaseId() != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId())); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, + Long.toString(reqParams.getPosition())); + + if ((reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_MODE) || ( + reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE)) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, TRUE); + if (reqParams.getMode() == AppendRequestParameters.Mode.FLUSH_CLOSE_MODE) { + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, TRUE); + } + } + + // Check if the retry is with "Expect: 100-continue" header being present in the previous request. + if (reqParams.isRetryDueToExpect()) { + String userAgentRetry = getUserAgent(); + // Remove the specific marker related to "Expect: 100-continue" from the User-Agent string. + userAgentRetry = userAgentRetry.replace(HUNDRED_CONTINUE_USER_AGENT, EMPTY_STRING); + requestHeaders.removeIf(header -> header.getName().equalsIgnoreCase(USER_AGENT)); + requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry)); + } + + // Add MD5 Hash of request content as request header if feature is enabled + if (isChecksumValidationEnabled()) { + addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + } + + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, reqParams.getoffset(), reqParams.getLength(), + sasTokenForReuse); + try { + op.execute(tracingContext); + } catch (AbfsRestOperationException e) { + /* + If the http response code indicates a user error we retry + the same append request with expect header being disabled. + When "100-continue" header is enabled but a non Http 100 response comes, + the response message might not get set correctly by the server. + So, this handling is to avoid breaking of backward compatibility + if someone has taken dependency on the exception message, + which is created using the error string present in the response header. + */ + int responseStatusCode = e.getStatusCode(); + if (checkUserError(responseStatusCode) + && reqParams.isExpectHeaderEnabled()) { + LOG.debug( + "User error, retrying without 100 continue enabled for the given path {}", + path); + reqParams.setExpectHeaderEnabled(false); + reqParams.setRetryDueToExpect(true); + return this.append(path, buffer, reqParams, cachedSasToken, + contextEncryptionAdapter, tracingContext); + } + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + + if (isMd5ChecksumError(e)) { + throw new AbfsInvalidChecksumException(e); + } + + if (reqParams.isAppendBlob() + && appendSuccessCheckOp(op, path, + (reqParams.getPosition() + reqParams.getLength()), tracingContext)) { + final AbfsRestOperation successOp = getAbfsRestOperation( + AbfsRestOperationType.Append, + HTTP_METHOD_PUT, url, requestHeaders, + buffer, reqParams.getoffset(), reqParams.getLength(), + sasTokenForReuse); + successOp.hardSetResult(HttpURLConnection.HTTP_OK); + return successOp; + } + throw e; + } catch (AzureBlobFileSystemException e) { + // Any server side issue will be returned as AbfsRestOperationException and will be handled above. + LOG.debug( + "Append request failed with non server issues for path: {}, offset: {}, position: {}", + path, reqParams.getoffset(), reqParams.getPosition()); + throw e; + } + + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * Flush previously uploaded data to a file. + * @param path on which data has to be flushed. + * @param position to which data has to be flushed. + * @param retainUncommittedData whether to retain uncommitted data after flush. + * @param isClose specify if this is the last flush to the file. + * @param cachedSasToken to be used for the authenticating operation. + * @param leaseId if there is an active lease on the path. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation flush(final String path, + final long position, + boolean retainUncommittedData, + boolean isClose, + final String cachedSasToken, + final String leaseId, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (leaseId != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, + String.valueOf(retainUncommittedData)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.Flush, + HTTP_METHOD_PUT, url, requestHeaders, + sasTokenForReuse); + op.execute(tracingContext); + return op; + } + + @Override + public AbfsRestOperation flush(byte[] buffer, + final String path, + boolean isClose, + final String cachedSasToken, + final String leaseId, + final String eTag, + final TracingContext tracingContext) throws AzureBlobFileSystemException { + throw new UnsupportedOperationException( + "Flush with blockIds not supported on DFS Endpoint"); + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * Set the properties of a file or directory. + * @param path on which properties have to be set. + * @param properties list of metadata key-value pairs. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setPathProperties(final String path, + final Hashtable properties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, commaSeparatedProperties)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); + appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetPathProperties, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Get Properties. + * Get the properties of a file or directory. + * @param path of which properties have to be fetched. + * @param includeProperties to include user defined properties. + * @param tracingContext for tracing the server calls. + * @param contextEncryptionAdapter to provide encryption context. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation getPathStatus(final String path, + final boolean includeProperties, + final TracingContext tracingContext, + final ContextEncryptionAdapter contextEncryptionAdapter) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; + if (!includeProperties) { + // The default action (operation) is implicitly to get properties and this action requires read permission + // because it reads user defined properties. If the action is getStatus or getAclStatus, then + // only traversal (execute) permission is required. + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_STATUS); + operation = SASTokenProvider.GET_STATUS_OPERATION; + } else { + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + } + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(getAbfsConfiguration().isUpnUsed())); + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetPathStatus, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Read. + * Read the contents of the file at specified path + * @param path of the file to be read. + * @param position in the file from where data has to be read. + * @param buffer to store the data read. + * @param bufferOffset offset in the buffer to start storing the data. + * @param bufferLength length of data to be read. + * @param eTag to specify conditional headers. + * @param cachedSasToken to be used for the authenticating operation. + * @param contextEncryptionAdapter to provide encryption context. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation read(final String path, + final long position, + final byte[] buffer, + final int bufferOffset, + final int bufferLength, + final String eTag, + String cachedSasToken, + ContextEncryptionAdapter contextEncryptionAdapter, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + addEncryptionKeyRequestHeaders(path, requestHeaders, false, + contextEncryptionAdapter, tracingContext); + AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE, + String.format("bytes=%d-%d", position, position + bufferLength - 1)); + requestHeaders.add(rangeHeader); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + + // Add request header to fetch MD5 Hash of data returned by server. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, + SASTokenProvider.READ_OPERATION, + abfsUriQueryBuilder, cachedSasToken); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.ReadFile, + HTTP_METHOD_GET, url, requestHeaders, + buffer, bufferOffset, bufferLength, + sasTokenForReuse); + op.execute(tracingContext); + + // Verify the MD5 hash returned by server holds valid on the data received. + if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { + verifyCheckSumForRead(buffer, op.getResult(), bufferOffset); + } + + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Delete. + * Delete the file or directory at specified path. + * @param path to be deleted. + * @param recursive if the path is a directory, delete recursively. + * @param continuation to specify continuation token. + * @param tracingContext for tracing the server calls. + * @param isNamespaceEnabled specify if the namespace is enabled. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation deletePath(final String path, + final boolean recursive, + final String continuation, + TracingContext tracingContext, + final boolean isNamespaceEnabled) throws AzureBlobFileSystemException { + /* + * If Pagination is enabled and current API version is old, + * use the minimum required version for pagination. + * If Pagination is enabled and current API version is later than minimum required + * version for pagination, use current version only as azure service is backward compatible. + * If pagination is disabled, use the current API version only. + */ + final List requestHeaders = (isPaginatedDelete(recursive, + isNamespaceEnabled) && getxMsVersion().compareTo( + ApiVersion.AUG_03_2023) < 0) + ? createDefaultHeaders(ApiVersion.AUG_03_2023) + : createDefaultHeaders(); + final AbfsUriQueryBuilder abfsUriQueryBuilder + = createDefaultUriQueryBuilder(); + + if (isPaginatedDelete(recursive, isNamespaceEnabled)) { + // Add paginated query parameter + abfsUriQueryBuilder.addQuery(QUERY_PARAM_PAGINATED, TRUE); + } + + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, + String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + String operation = recursive + ? SASTokenProvider.DELETE_RECURSIVE_OPERATION + : SASTokenProvider.DELETE_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.DeletePath, this, + HTTP_METHOD_DELETE, url, requestHeaders, getAbfsConfiguration()); + try { + op.execute(tracingContext); + } catch (AzureBlobFileSystemException e) { + // If we have no HTTP response, throw the original exception. + if (!op.hasResult()) { + throw e; + } + final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op); + if (idempotencyOp.getResult().getStatusCode() + == op.getResult().getStatusCode()) { + // idempotency did not return different result + // throw back the exception + throw e; + } else { + return idempotencyOp; + } + } + + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * @param path on which owner has to be set. + * @param owner to be set. + * @param group to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setOwner(final String path, + final String owner, + final String group, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + if (owner != null && !owner.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner)); + } + if (group != null && !group.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetOwner, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * @param path on which permission has to be set. + * @param permission to be set. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setPermission(final String path, + final String permission, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader( + HttpHeaderConfigurations.X_MS_PERMISSIONS, permission)); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetPermissions, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Update. + * @param path on which ACL has to be set. + * @param aclSpecString to be set. + * @param eTag to specify conditional headers. Set only if etag matches. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation setAcl(final String path, + final String aclSpecString, + final String eTag, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + // JDK7 does not support PATCH, so to workaround the issue we will use + // PUT and specify the real method in the X-Http-Method-Override header. + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); + requestHeaders.add( + new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString)); + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add( + new AbfsHttpHeader(IF_MATCH, eTag)); + } + + final AbfsUriQueryBuilder abfsUriQueryBuilder + = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, + SET_ACCESS_CONTROL); + appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.SetAcl, + HTTP_METHOD_PUT, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Get Properties. + * Retrieves the ACL properties of blob at specified path. + * @param path of which properties have to be fetched. + * @param useUPN whether to use UPN with rest operation. + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation getAclStatus(final String path, + final boolean useUPN, + TracingContext tracingContext) throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, GET_ACCESS_CONTROL); + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, + String.valueOf(useUPN)); + appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, + abfsUriQueryBuilder); + + final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + final AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.GetAcl, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Get Rest Operation for API + * + * Path - Get Properties. + * @param path Path for which access check needs to be performed + * @param rwx The permission to be checked on the path + * @param tracingContext for tracing the server calls. + * @return executed rest operation containing response from server. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + @Override + public AbfsRestOperation checkAccess(String path, + String rwx, + TracingContext tracingContext) + throws AzureBlobFileSystemException { + final List requestHeaders = createDefaultHeaders(); + + AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS); + abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx); + appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, + abfsUriQueryBuilder); + + URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); + AbfsRestOperation op = getAbfsRestOperation( + AbfsRestOperationType.CheckAccess, + HTTP_METHOD_HEAD, url, requestHeaders); + op.execute(tracingContext); + return op; + } + + /** + * Checks if the rest operation results indicate if the path is a directory. + * @param result executed rest operation containing response from server. + * @return True if the path is a directory, False otherwise. + */ + @Override + public boolean checkIsDir(AbfsHttpOperation result) { + String resourceType = result.getResponseHeader( + HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + return StringUtils.equalsIgnoreCase(resourceType, DIRECTORY); + } + + /** + * Returns true if the status code lies in the range of user error. + * @param responseStatusCode http response status code. + * @return True or False. + */ + @Override + public boolean checkUserError(int responseStatusCode) { + return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST + && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR); + } + + private String convertXmsPropertiesToCommaSeparatedString(final Map properties) throws CharacterCodingException { + StringBuilder commaSeparatedProperties = new StringBuilder(); + + final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING_ASCII).newEncoder(); + + for (Map.Entry propertyEntry : properties.entrySet()) { + String key = propertyEntry.getKey(); + String value = propertyEntry.getValue(); + + Boolean canEncodeValue = encoder.canEncode(value); + if (!canEncodeValue) { + throw new CharacterCodingException(); + } + + String encodedPropertyValue = Base64.encode(encoder.encode(CharBuffer.wrap(value)).array()); + commaSeparatedProperties.append(key) + .append(AbfsHttpConstants.EQUAL) + .append(encodedPropertyValue); + + commaSeparatedProperties.append(AbfsHttpConstants.COMMA); + } + + if (commaSeparatedProperties.length() != 0) { + commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1); + } + + return commaSeparatedProperties.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java index e27d54b443..c769186692 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.utils; import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; import java.net.URL; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; @@ -30,11 +31,14 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EQUAL; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SAOID; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SIGNATURE; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SKOID; @@ -169,6 +173,38 @@ public static String getMaskedUrl(URL url) { return url.toString().replace(queryString, maskedQueryString); } + /** + * Changes Blob Endpoint URL to DFS Endpoint URL. + * If original url is not Blob Endpoint URL, it will return the original URL. + * @param url to be converted. + * @return updated URL + * @throws InvalidUriException in case of MalformedURLException. + */ + public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException { + try { + url = new URL(url.toString().replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME)); + } catch (MalformedURLException ex) { + throw new InvalidUriException(url.toString()); + } + return url; + } + + /** + * Changes DFS Endpoint URL to Blob Endpoint URL. + * If original url is not DFS Endpoint URL, it will return the original URL. + * @param url to be converted. + * @return updated URL + * @throws InvalidUriException in case of MalformedURLException. + */ + public static URL changeUrlFromDfsToBlob(URL url) throws InvalidUriException { + try { + url = new URL(url.toString().replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); + } catch (MalformedURLException ex) { + throw new InvalidUriException(url.toString()); + } + return url; + } + private UriUtils() { } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md new file mode 100644 index 0000000000..f93593cecf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/site/markdown/fns_blob.md @@ -0,0 +1,82 @@ + + +# ABFS Driver for Namespace Disabled Accounts (FNS: Flat Namespace) + +### Note: FNS-BLOB Support is being built and not yet ready for usage. + +## Background +The ABFS driver is recommended to be used only with HNS Enabled ADLS Gen-2 accounts +for big data analytics because of being more performant and scalable. + +However, to enable users of legacy WASB Driver to migrate to ABFS driver without +needing them to upgrade their general purpose V2 accounts (HNS-Disabled), Support +for FNS accounts is being added to ABFS driver. +Refer to [WASB Deprication](./wasb.html) for more details. + +## Azure Service Endpoints Used by ABFS Driver +Azure Services offers two set of endpoints for interacting with storage accounts: +1. [Azure Blob Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api) referred as Blob Endpoint +2. [Azure Data Lake Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups) referred as DFS Endpoint + +The ABFS Driver by default is designed to work with DFS Endpoint only which primarily +supports HNS Enabled Accounts only. + +To enable ABFS Driver to work with FNS Accounts, Support for Blob Endpoint is being added. +This is because Azure services do not recommend using DFS Endpoint for FNS Accounts. +ABFS Driver will only allow FNS Accounts to be accessed using Blob Endpoint. +HNS Enabled accounts will still use DFS Endpoint which continues to be the +recommended stack based on performance and feature capabilities. + +## Configuring ABFS Driver for FNS Accounts +Following configurations will be introduced to configure ABFS Driver for FNS Accounts: +1. Account Type: Must be set to `false` to indicate FNS Account + ```xml + + fs.azure.account.hns.enabled + false + + ``` + +2. Account Url: It is the URL used to initialize the file system. It is either passed +directly to file system or configured as default uri using "fs.DefaultFS" configuration. +In both the cases the URL used must be the blob endpoint url of the account. + ```xml + + fs.defaultFS + https://ACCOUNT_NAME.blob.core.windows.net + + ``` +3. Service Type for FNS Accounts: This will allow an override to choose service +type specially in cases where any local DNS resolution is set for the account and driver is +unable to detect the intended endpoint from above configured URL. If this is set +to blob for HNS Enabled Accounts, FS init will fail with InvalidConfiguration error. + ```xml + + fs.azure.fns.account.service.type + BLOB + + ``` + +4. Service Type for Ingress Operations: This will allow an override to choose service +type only for Ingress Related Operations like [Create](https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id), +[Append](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id) +and [Flush](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id). All other operations will still use the +configured service type. + ```xml + + fs.azure.fns.account.service.type + BLOB + + ``` \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 177ab282c1..143cba8a7a 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -20,6 +20,7 @@ See also: * [WASB](./wasb.html) * [ABFS](./abfs.html) +* [Namespace Disabled Accounts on ABFS](./fns_blob.html) * [Testing](./testing_azure.html) ## Introduction diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java index 0951ed9a03..b121fb9420 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java @@ -339,8 +339,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, case SET_ATTR: Hashtable properties = new Hashtable<>(); properties.put("key", "{ value: valueTest }"); - return client.setPathProperties(path, fs.getAbfsStore() - .convertXmsPropertiesToCommaSeparatedString(properties), + return client.setPathProperties(path, properties, getTestTracingContext(fs, false), createEncryptionAdapterFromServerStoreContext(path, getTestTracingContext(fs, false), client)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java index e185ab2e75..71c77ce82c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java @@ -22,6 +22,7 @@ import java.lang.reflect.Field; import java.util.List; +import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.util.Lists; import org.junit.Assume; import org.junit.Test; @@ -97,8 +98,12 @@ private void setTestUserFs() throws Exception { + getAccountName(), ClientCredsTokenProvider.class.getName()); conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false); - conf.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED); + // Since FS init now needs to know account type setting it before init to avoid that. + conf.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isHNSEnabled); this.testUserFs = FileSystem.newInstance(conf); + // Resetting the namespace enabled flag to unknown after file system init. + ((AzureBlobFileSystem) testUserFs).getAbfsStore().setNamespaceEnabled( + Trilean.UNKNOWN); } private void setTestFsConf(final String fsConfKey, @@ -306,11 +311,11 @@ public void testFsActionALL() throws Exception { } private void checkPrerequisites() throws Exception { - setTestUserFs(); Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false", isHNSEnabled); Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false", isCheckAccessEnabled); + setTestUserFs(); checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID); checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET); checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 1ff3458fdb..44b1685a3f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -20,19 +20,32 @@ import java.io.FileNotFoundException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.junit.Test; import org.mockito.Mockito; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.ArgumentMatchers.any; + /** * Test filesystem initialization and creation. */ @@ -73,11 +86,11 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception { TracingContext tracingContext = getSampleTracingContext(fs, true); Mockito.doReturn(Mockito.mock(AbfsRestOperation.class)) .when(client) - .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class)); + .getAclStatus(Mockito.anyString(), any(TracingContext.class)); store.getIsNamespaceEnabled(tracingContext); Mockito.verify(client, Mockito.times(1)) - .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class)); + .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } @Test @@ -96,6 +109,31 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { store.getIsNamespaceEnabled(tracingContext); Mockito.verify(client, Mockito.times(0)) - .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class)); + .getAclStatus(Mockito.anyString(), any(TracingContext.class)); + } + + // Todo: [FnsOverBlob] Remove this test case once Blob Endpoint Support is ready and enabled. + @Test + public void testFileSystemInitFailsWithBlobEndpoitUrl() throws Exception { + Configuration configuration = getRawConfiguration(); + String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); + String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); + intercept(InvalidConfigurationValueException.class, + "Blob Endpoint Support not yet available", () -> + FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); + } + + @Test + public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exception { + AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance( + getRawConfiguration())); + AzureBlobFileSystem mockedFs = Mockito.spy(fs); + Mockito.doThrow( + new AbfsRestOperationException(HTTP_UNAVAILABLE, "Throttled", + "Throttled", null)).when(mockedFs).getIsNamespaceEnabled(any()); + + intercept(AzureBlobFileSystemException.class, + FS_AZURE_ACCOUNT_IS_HNS_ENABLED, () -> + mockedFs.initialize(fs.getUri(), getRawConfiguration())); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java index d168ed3884..d4c58c9705 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -140,12 +141,15 @@ private String getNonExistingUrl() { @Test public void testFailedRequestWhenFSNotExist() throws Exception { + assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT); AbfsConfiguration config = this.getConfiguration(); config.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false); String testUri = this.getTestUrl(); String nonExistingFsUrl = getAbfsScheme() + "://" + UUID.randomUUID() + testUri.substring(testUri.indexOf("@")); + config.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isUsingXNSAccount); AzureBlobFileSystem fs = this.getFileSystem(nonExistingFsUrl); + fs.getAbfsStore().setNamespaceEnabled(Trilean.UNKNOWN); intercept(FileNotFoundException.class, "\"The specified filesystem does not exist.\", 404", @@ -214,12 +218,14 @@ private void unsetConfAndEnsureGetAclCallIsMadeOnce() throws IOException { private AbfsClient callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient() throws IOException { - final AzureBlobFileSystem abfs = this.getFileSystem(); - final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore(); - final AbfsClient mockClient = mock(AbfsClient.class); + final AzureBlobFileSystem abfs = Mockito.spy(this.getFileSystem()); + final AzureBlobFileSystemStore abfsStore = Mockito.spy(abfs.getAbfsStore()); + final AbfsClient mockClient = mock(AbfsDfsClient.class); + doReturn(abfsStore).when(abfs).getAbfsStore(); + doReturn(mockClient).when(abfsStore).getClient(); + doReturn(mockClient).when(abfsStore).getClient(any()); doReturn(mock(AbfsRestOperation.class)).when(mockClient) .getAclStatus(anyString(), any(TracingContext.class)); - abfsStore.setClient(mockClient); getIsNamespaceEnabled(abfs); return mockClient; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 909e7cf174..81897a5687 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -160,7 +160,8 @@ private String getUserAgentString(AbfsConfiguration config, boolean includeSSLProvider) throws IOException, URISyntaxException { AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); - AbfsClient client = new AbfsClient(new URL("https://azure.com"), null, + // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), null, config, (AccessTokenProvider) null, null, abfsClientContext); String sslProviderName = null; if (includeSSLProvider) { @@ -363,7 +364,8 @@ public static AbfsClient createTestClientFromCurrentContext( .build(); // Create test AbfsClient - AbfsClient testClient = new AbfsClient( + // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + AbfsClient testClient = new AbfsDfsClient( baseAbfsClientInstance.getBaseUrl(), (currentAuthType == AuthType.SharedKey ? new SharedKeyCredentials( @@ -391,7 +393,8 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, (currentAuthType == AuthType.SharedKey) || (currentAuthType == AuthType.OAuth)); - AbfsClient client = mock(AbfsClient.class); + // Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready. + AbfsClient client = mock(AbfsDfsClient.class); AbfsPerfTracker tracker = new AbfsPerfTracker( "test", abfsConfig.getAccountName(),