HADOOP-19187: [ABFS][FNSOverBlob] AbfsClient Refactoring to Support Multiple Implementation of Clients. (#6879)

Refactor AbfsClient into DFS and Blob Client.

Contributed by Anuj Modi
This commit is contained in:
Anuj Modi 2024-08-20 22:37:07 +05:30 committed by GitHub
parent 33c9ecb652
commit b15ed27cfb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 2287 additions and 912 deletions

View File

@ -44,6 +44,8 @@
<suppressions>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/>
<suppress checks="ParameterNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/>
<suppress checks="ParameterNumber|MagicNumber"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]utils[\\/]Base64.java"/>
<suppress checks="ParameterNumber|VisibilityModifier"

View File

@ -22,6 +22,7 @@
import java.lang.reflect.Field;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
@ -74,6 +75,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
@ -87,6 +89,8 @@ public class AbfsConfiguration{
private final Configuration rawConfig;
private final String accountName;
// Service type identified from URL used to initialize FileSystem.
private final AbfsServiceType fsConfiguredServiceType;
private final boolean isSecure;
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);
@ -94,6 +98,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED)
private String isNamespaceEnabledAccount;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK)
private boolean isDfsToBlobFallbackEnabled;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_MAX_CONCURRENT_REQUESTS,
DefaultValue = -1)
private int writeMaxConcurrentRequestCount;
@ -408,11 +416,14 @@ public class AbfsConfiguration{
private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
public AbfsConfiguration(final Configuration rawConfig,
String accountName,
AbfsServiceType fsConfiguredServiceType)
throws IllegalAccessException, IOException {
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
rawConfig, AzureBlobFileSystem.class);
this.accountName = accountName;
this.fsConfiguredServiceType = fsConfiguredServiceType;
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);
Field[] fields = this.getClass().getDeclaredFields();
@ -434,10 +445,75 @@ public AbfsConfiguration(final Configuration rawConfig, String accountName)
}
}
public AbfsConfiguration(final Configuration rawConfig, String accountName)
throws IllegalAccessException, IOException {
this(rawConfig, accountName, AbfsServiceType.DFS);
}
public Trilean getIsNamespaceEnabledAccount() {
return Trilean.getTrilean(isNamespaceEnabledAccount);
}
/**
* Returns the service type to be used based on the filesystem configuration.
* Precedence is given to service type configured for FNS Accounts using
* "fs.azure.fns.account.service.type". If not configured, then the service
* type identified from url used to initialize filesystem will be used.
* @return the service type.
*/
public AbfsServiceType getFsConfiguredServiceType() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType);
}
/**
* Returns the service type configured for FNS Accounts to override the
* service type identified by URL used to initialize the filesystem.
* @return the service type.
*/
public AbfsServiceType getConfiguredServiceTypeForFNSAccounts() {
return getEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, null);
}
/**
* Returns the service type to be used for Ingress Operations irrespective of account type.
* Default value is the same as the service type configured for the file system.
* @return the service type.
*/
public AbfsServiceType getIngressServiceType() {
return getEnum(FS_AZURE_INGRESS_SERVICE_TYPE, getFsConfiguredServiceType());
}
/**
* Returns whether there is a need to move traffic from DFS to Blob.
* Needed when the service type is DFS and operations are experiencing compatibility issues.
* @return true if fallback enabled.
*/
public boolean isDfsToBlobFallbackEnabled() {
return isDfsToBlobFallbackEnabled;
}
/**
* Checks if the service type configured is valid for account type used.
* HNS Enabled accounts cannot have service type as BLOB.
* @param isHNSEnabled Flag to indicate if HNS is enabled for the account.
* @throws InvalidConfigurationValueException if the service type is invalid.
*/
public void validateConfiguredServiceType(boolean isHNSEnabled)
throws InvalidConfigurationValueException {
// Todo: [FnsOverBlob] - Remove this check, Failing FS Init with Blob Endpoint Until FNS over Blob is ready.
if (getFsConfiguredServiceType() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Support not yet available");
}
if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(
FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Cannot be BLOB for HNS Account");
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
}
}
/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
@ -478,6 +554,7 @@ public String get(String key) {
* Returns the account-specific value if it exists, then looks for an
* account-agnostic value.
* @param key Account-agnostic configuration key
* @param defaultValue Value returned if none is configured
* @return value if one exists, else the default value
*/
public String getString(String key, String defaultValue) {
@ -522,7 +599,7 @@ public int getInt(String key, int defaultValue) {
* looks for an account-agnostic value.
* @param key Account-agnostic configuration key
* @return value in String form if one exists, else null
* @throws IOException
* @throws IOException if parsing fails.
*/
public String getPasswordString(String key) throws IOException {
char[] passchars = rawConfig.getPassword(accountConf(key));

View File

@ -45,6 +45,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
@ -109,13 +110,16 @@
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
@ -213,6 +217,23 @@ public void initialize(URI uri, Configuration configuration)
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat, listener);
/*
* Validate the service type configured in the URI is valid for account type used.
* HNS Account Cannot have Blob Endpoint URI.
*/
try {
abfsConfiguration.validateConfiguredServiceType(
tryGetIsNamespaceEnabled(new TracingContext(tracingContext)));
} catch (InvalidConfigurationValueException ex) {
LOG.debug("File system configured with Invalid Service Type", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Failed to determine account type for service type validation", ex);
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}
// Create the file system if it does not exist.
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH), tracingContext) == null) {
try {
@ -230,10 +251,7 @@ public void initialize(URI uri, Configuration configuration)
*/
if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
abfsConfiguration))
&& !getIsNamespaceEnabled(
new TracingContext(clientCorrelationId, fileSystemId,
FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat,
listener))) {
&& !getIsNamespaceEnabled(new TracingContext(tracingContext))) {
/*
* Close the filesystem gracefully before throwing exception. Graceful close
* will ensure that all resources are released properly.
@ -1400,6 +1418,34 @@ private FileStatus tryGetFileStatus(final Path f, TracingContext tracingContext)
}
}
/**
* Utility function to check if the namespace is enabled on the storage account.
* If request fails with 4xx other than 400, it will be inferred as HNS.
* @param tracingContext tracing context
* @return true if namespace is enabled, false otherwise.
* @throws AzureBlobFileSystemException if any other error occurs.
*/
private boolean tryGetIsNamespaceEnabled(TracingContext tracingContext)
throws AzureBlobFileSystemException{
try {
return getIsNamespaceEnabled(tracingContext);
} catch (AbfsRestOperationException ex) {
/*
* Exception will be thrown for any non 400 error code.
* If status code is in 4xx range, it means it's an HNS account.
* If status code is in 5xx range, it means nothing can be inferred.
* In case of network errors status code will be -1.
*/
int statusCode = ex.getStatusCode();
if (statusCode > HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR) {
LOG.debug("getNamespace failed with non 400 user error", ex);
statIncrement(ERROR_IGNORED);
return true;
}
throw ex;
}
}
private boolean fileSystemExists() throws IOException {
LOG.debug(
"AzureBlobFileSystem.fileSystemExists uri: {}", uri);
@ -1660,7 +1706,7 @@ AbfsDelegationTokenManager getDelegationTokenManager() {
@VisibleForTesting
boolean getIsNamespaceEnabled(TracingContext tracingContext)
throws AzureBlobFileSystemException {
return abfsStore.getIsNamespaceEnabled(tracingContext);
return getAbfsStore().getIsNamespaceEnabled(tracingContext);
}
/**

View File

@ -29,11 +29,9 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.Instant;
@ -55,11 +53,13 @@
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.fs.PathIOException;
@ -158,6 +158,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_FOOTER_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
/**
@ -169,6 +170,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class);
private AbfsClient client;
/**
* Variable to hold the client handler which will determine the operative
* client based on the service type configured.
* Initialized in the {@link #initializeClient(URI, String, String, boolean)}.
*/
private AbfsClientHandler clientHandler;
private URI uri;
private String userName;
private String primaryUserGroup;
@ -221,7 +229,8 @@ public AzureBlobFileSystemStore(
leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
try {
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration, accountName);
this.abfsConfiguration = new AbfsConfiguration(abfsStoreBuilder.configuration,
accountName, getAbfsServiceTypeFromUrl());
} catch (IllegalAccessException exception) {
throw new FileSystemOperationUnhandledException(exception);
}
@ -286,6 +295,8 @@ public AzureBlobFileSystemStore(
/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
* @param key The key to check.
* @return True if the key should be stored as a page blob, false otherwise.
*/
public boolean isAppendBlobKey(String key) {
return isKeyForDirectorySet(key, appendBlobDirSet);
@ -497,15 +508,9 @@ public void setFilesystemProperties(
try (AbfsPerfInfo perfInfo = startTracking("setFilesystemProperties",
"setFilesystemProperties")) {
final String commaSeparatedProperties;
try {
commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
final AbfsRestOperation op = client
.setFilesystemProperties(commaSeparatedProperties, tracingContext);
final AbfsRestOperation op = getClient()
.setFilesystemProperties(properties, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
}
@ -590,18 +595,13 @@ public void setPathProperties(final Path path,
path,
properties);
final String commaSeparatedProperties;
try {
commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties);
} catch (CharacterCodingException ex) {
throw new InvalidAbfsRestOperationException(ex);
}
final String relativePath = getRelativePath(path);
final ContextEncryptionAdapter contextEncryptionAdapter
= createEncryptionAdapterFromServerStoreContext(relativePath,
tracingContext);
final AbfsRestOperation op = client
.setPathProperties(getRelativePath(path), commaSeparatedProperties,
.setPathProperties(getRelativePath(path), properties,
tracingContext, contextEncryptionAdapter);
contextEncryptionAdapter.destroy();
perfInfo.registerResult(op.getResult()).registerSuccess(true);
@ -1810,18 +1810,29 @@ private void initializeClient(URI uri, String fileSystemName,
LOG.trace("Initializing AbfsClient for {}", baseUrl);
if (tokenProvider != null) {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
populateAbfsClientContext());
} else {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
this.clientHandler = new AbfsClientHandler(baseUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
populateAbfsClientContext());
}
this.client = getClientHandler().getClient();
LOG.trace("AbfsClient init complete");
}
private AbfsServiceType getAbfsServiceTypeFromUrl() {
if (uri.toString().contains(ABFS_BLOB_DOMAIN_NAME)) {
return AbfsServiceType.BLOB;
}
// In case of DFS Domain name or any other custom endpoint, the service
// type is to be identified as default DFS.
LOG.debug("Falling back to default service type DFS");
return AbfsServiceType.DFS;
}
/**
* Populate a new AbfsClientContext instance with the desired properties.
*
@ -1861,43 +1872,6 @@ private boolean parseIsDirectory(final String resourceType) {
&& resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY);
}
/**
* Convert properties stored in a Map into a comma separated string. For map
* <key1:value1; key2:value2: keyN:valueN>, method would convert to:
* key1=value1,key2=value,...,keyN=valueN
* */
@VisibleForTesting
String convertXmsPropertiesToCommaSeparatedString(final Map<String,
String> properties) throws
CharacterCodingException {
StringBuilder commaSeparatedProperties = new StringBuilder();
final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder();
for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
String key = propertyEntry.getKey();
String value = propertyEntry.getValue();
Boolean canEncodeValue = encoder.canEncode(value);
if (!canEncodeValue) {
throw new CharacterCodingException();
}
String encodedPropertyValue = Base64.encode(encoder.encode(CharBuffer.wrap(value)).array());
commaSeparatedProperties.append(key)
.append(AbfsHttpConstants.EQUAL)
.append(encodedPropertyValue);
commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
}
if (commaSeparatedProperties.length() != 0) {
commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1);
}
return commaSeparatedProperties.toString();
}
private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws
InvalidFileSystemPropertyException, InvalidAbfsRestOperationException {
Hashtable<String, String> properties = new Hashtable<>();
@ -2176,6 +2150,16 @@ public AbfsClient getClient() {
return this.client;
}
@VisibleForTesting
public AbfsClient getClient(AbfsServiceType serviceType) {
return getClientHandler().getClient(serviceType);
}
@VisibleForTesting
public AbfsClientHandler getClientHandler() {
return this.clientHandler;
}
@VisibleForTesting
void setClient(AbfsClient client) {
this.client = client;

View File

@ -101,6 +101,8 @@ public final class AbfsHttpConstants {
public static final String GMT_TIMEZONE = "GMT";
public static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
public static final String XMS_PROPERTIES_ENCODING_ASCII = "ISO-8859-1";
public static final String XMS_PROPERTIES_ENCODING_UNICODE = "UTF-8";
public static final String ROOT_PATH = "/";
public static final String ACCESS_MASK = "mask:";

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.constants;
/**
* Azure Storage Offers two sets of Rest APIs for interacting with the storage account.
* <ol>
* <li>Blob Rest API: <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api></a></li>
* <li>Data Lake Rest API: <a href = https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups></a></li>
* </ol>
*/
public enum AbfsServiceType {
/**
* Service type to set operative endpoint as Data Lake Rest API.
*/
DFS,
/**
* Service type to set operative endpoint as Blob Rest API.
*/
BLOB
}

View File

@ -32,9 +32,35 @@ public final class ConfigurationKeys {
/**
* Config to specify if the configured account is HNS enabled or not. If
* this config is not set, getacl call is made on account filesystem root
* path to determine HNS status.
* path on DFS Endpoint to determine HNS status.
*/
public static final String FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "fs.azure.account.hns.enabled";
/**
* Config to specify which {@link AbfsServiceType} to use with HNS-Disabled Account type.
* Default value will be identified from URL used to initialize filesystem.
* This will allow an override to choose service endpoint in cases where any
* local DNS resolution is set for account and driver is unable to detect
* intended endpoint from the url used to initialize filesystem.
* If configured Blob for HNS-Enabled account, FS init will fail.
* Value {@value} case-insensitive "DFS" or "BLOB"
*/
public static final String FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE = "fs.azure.fns.account.service.type";
/**
* Config to specify which {@link AbfsServiceType} to use only for Ingress Operations.
* Other operations will continue to move to the FS configured service endpoint.
* Value {@value} case-insensitive "DFS" or "BLOB"
*/
public static final String FS_AZURE_INGRESS_SERVICE_TYPE = "fs.azure.ingress.service.type";
/**
* Config to be set only for cases where traffic over dfs endpoint is
* experiencing compatibility issues and need to move to blob for mitigation.
* Value {@value} case-insensitive "True" or "False"
*/
public static final String FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = "fs.azure.enable.dfstoblob.fallback";
/**
* Enable or disable expect hundred continue header.
* Value: {@value}.

View File

@ -45,7 +45,8 @@ public enum FSOperationType {
SET_OWNER("SO"),
SET_ACL("SA"),
TEST_OP("TS"),
WRITE("WR");
WRITE("WR"),
INIT("IN");
private final String opCode;

View File

@ -32,6 +32,7 @@
public final class FileSystemConfigurations {
public static final String DEFAULT_FS_AZURE_ACCOUNT_IS_HNS_ENABLED = "";
public static final boolean DEFAULT_FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK = false;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";

View File

@ -38,5 +38,8 @@ public final class FileSystemUriSchemes {
public static final String WASB_SECURE_SCHEME = "wasbs";
public static final String WASB_DNS_PREFIX = "blob";
public static final String ABFS_DFS_DOMAIN_NAME = "dfs.core.windows.net";
public static final String ABFS_BLOB_DOMAIN_NAME = "blob.core.windows.net";
private FileSystemUriSchemes() {}
}

View File

@ -34,4 +34,8 @@ public InvalidConfigurationValueException(String configKey, Exception innerExcep
public InvalidConfigurationValueException(String configKey) {
super("Invalid configuration value detected for " + configKey);
}
public InvalidConfigurationValueException(String configKey, String message) {
super(String.format("Invalid configuration value detected for \"%s\". %s ", configKey, message));
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.changeUrlFromBlobToDfs;
/**
* AbfsClientHandler is a class that provides a way to get the AbfsClient
* based on the service type.
*/
public class AbfsClientHandler {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClientHandler.class);
private AbfsServiceType defaultServiceType;
private final AbfsDfsClient dfsAbfsClient;
public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, tokenProvider, null, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}
public AbfsClientHandler(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
abfsClientContext);
initServiceType(abfsConfiguration);
}
/**
* Initialize the default service type based on the user configuration.
* @param abfsConfiguration set by user.
*/
private void initServiceType(final AbfsConfiguration abfsConfiguration) {
this.defaultServiceType = abfsConfiguration.getFsConfiguredServiceType();
}
/**
* Get the AbfsClient based on the default service type.
* @return AbfsClient
*/
public AbfsClient getClient() {
return getClient(defaultServiceType);
}
/**
* Get the AbfsClient based on the service type.
* @param serviceType AbfsServiceType
* @return AbfsClient
*/
public AbfsClient getClient(AbfsServiceType serviceType) {
return serviceType == AbfsServiceType.DFS ? dfsAbfsClient : null;
}
/**
* Create the AbfsDfsClient using the url used to configure file system.
* If URL is for Blob endpoint, it will be converted to DFS endpoint.
* @param baseUrl URL
* @param creds SharedKeyCredentials
* @param abfsConfiguration AbfsConfiguration
* @param tokenProvider AccessTokenProvider
* @param sasTokenProvider SASTokenProvider
* @param encryptionContextProvider EncryptionContextProvider
* @param abfsClientContext AbfsClientContext
* @return AbfsDfsClient with DFS endpoint URL
* @throws IOException if URL conversion fails.
*/
private AbfsDfsClient createDfsClient(final URL baseUrl,
final SharedKeyCredentials creds,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final SASTokenProvider sasTokenProvider,
final EncryptionContextProvider encryptionContextProvider,
final AbfsClientContext abfsClientContext) throws IOException {
URL dfsUrl = changeUrlFromBlobToDfs(baseUrl);
if (tokenProvider != null) {
LOG.debug("Creating AbfsDfsClient with access token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
tokenProvider, encryptionContextProvider,
abfsClientContext);
} else {
LOG.debug("Creating AbfsDfsClient with SAS token provider using the URL: {}", dfsUrl);
return new AbfsDfsClient(dfsUrl, creds, abfsConfiguration,
sasTokenProvider, encryptionContextProvider,
abfsClientContext);
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.utils;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
@ -30,11 +31,14 @@
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URLEncodedUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EQUAL;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SAOID;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SIGNATURE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_SKOID;
@ -169,6 +173,38 @@ public static String getMaskedUrl(URL url) {
return url.toString().replace(queryString, maskedQueryString);
}
/**
* Changes Blob Endpoint URL to DFS Endpoint URL.
* If original url is not Blob Endpoint URL, it will return the original URL.
* @param url to be converted.
* @return updated URL
* @throws InvalidUriException in case of MalformedURLException.
*/
public static URL changeUrlFromBlobToDfs(URL url) throws InvalidUriException {
try {
url = new URL(url.toString().replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME));
} catch (MalformedURLException ex) {
throw new InvalidUriException(url.toString());
}
return url;
}
/**
* Changes DFS Endpoint URL to Blob Endpoint URL.
* If original url is not DFS Endpoint URL, it will return the original URL.
* @param url to be converted.
* @return updated URL
* @throws InvalidUriException in case of MalformedURLException.
*/
public static URL changeUrlFromDfsToBlob(URL url) throws InvalidUriException {
try {
url = new URL(url.toString().replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME));
} catch (MalformedURLException ex) {
throw new InvalidUriException(url.toString());
}
return url;
}
private UriUtils() {
}
}

View File

@ -0,0 +1,82 @@
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
# ABFS Driver for Namespace Disabled Accounts (FNS: Flat Namespace)
### Note: FNS-BLOB Support is being built and not yet ready for usage.
## Background
The ABFS driver is recommended to be used only with HNS Enabled ADLS Gen-2 accounts
for big data analytics because of being more performant and scalable.
However, to enable users of legacy WASB Driver to migrate to ABFS driver without
needing them to upgrade their general purpose V2 accounts (HNS-Disabled), Support
for FNS accounts is being added to ABFS driver.
Refer to [WASB Deprication](./wasb.html) for more details.
## Azure Service Endpoints Used by ABFS Driver
Azure Services offers two set of endpoints for interacting with storage accounts:
1. [Azure Blob Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api) referred as Blob Endpoint
2. [Azure Data Lake Storage](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/operation-groups) referred as DFS Endpoint
The ABFS Driver by default is designed to work with DFS Endpoint only which primarily
supports HNS Enabled Accounts only.
To enable ABFS Driver to work with FNS Accounts, Support for Blob Endpoint is being added.
This is because Azure services do not recommend using DFS Endpoint for FNS Accounts.
ABFS Driver will only allow FNS Accounts to be accessed using Blob Endpoint.
HNS Enabled accounts will still use DFS Endpoint which continues to be the
recommended stack based on performance and feature capabilities.
## Configuring ABFS Driver for FNS Accounts
Following configurations will be introduced to configure ABFS Driver for FNS Accounts:
1. Account Type: Must be set to `false` to indicate FNS Account
```xml
<property>
<name>fs.azure.account.hns.enabled</name>
<value>false</value>
</property>
```
2. Account Url: It is the URL used to initialize the file system. It is either passed
directly to file system or configured as default uri using "fs.DefaultFS" configuration.
In both the cases the URL used must be the blob endpoint url of the account.
```xml
<property>
<name>fs.defaultFS</name>
<value>https://ACCOUNT_NAME.blob.core.windows.net</value>
</property>
```
3. Service Type for FNS Accounts: This will allow an override to choose service
type specially in cases where any local DNS resolution is set for the account and driver is
unable to detect the intended endpoint from above configured URL. If this is set
to blob for HNS Enabled Accounts, FS init will fail with InvalidConfiguration error.
```xml
<property>
<name>fs.azure.fns.account.service.type</name>
<value>BLOB</value>
</property>
```
4. Service Type for Ingress Operations: This will allow an override to choose service
type only for Ingress Related Operations like [Create](https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id),
[Append](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id)
and [Flush](https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id). All other operations will still use the
configured service type.
```xml
<property>
<name>fs.azure.fns.account.service.type</name>
<value>BLOB</value>
</property>
```

View File

@ -20,6 +20,7 @@ See also:
* [WASB](./wasb.html)
* [ABFS](./abfs.html)
* [Namespace Disabled Accounts on ABFS](./fns_blob.html)
* [Testing](./testing_azure.html)
## Introduction

View File

@ -339,8 +339,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
case SET_ATTR:
Hashtable<String, String> properties = new Hashtable<>();
properties.put("key", "{ value: valueTest }");
return client.setPathProperties(path, fs.getAbfsStore()
.convertXmsPropertiesToCommaSeparatedString(properties),
return client.setPathProperties(path, properties,
getTestTracingContext(fs, false),
createEncryptionAdapterFromServerStoreContext(path,
getTestTracingContext(fs, false), client));

View File

@ -22,6 +22,7 @@
import java.lang.reflect.Field;
import java.util.List;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.util.Lists;
import org.junit.Assume;
import org.junit.Test;
@ -97,8 +98,12 @@ private void setTestUserFs() throws Exception {
+ getAccountName(), ClientCredsTokenProvider.class.getName());
conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
false);
conf.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
// Since FS init now needs to know account type setting it before init to avoid that.
conf.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isHNSEnabled);
this.testUserFs = FileSystem.newInstance(conf);
// Resetting the namespace enabled flag to unknown after file system init.
((AzureBlobFileSystem) testUserFs).getAbfsStore().setNamespaceEnabled(
Trilean.UNKNOWN);
}
private void setTestFsConf(final String fsConfKey,
@ -306,11 +311,11 @@ public void testFsActionALL() throws Exception {
}
private void checkPrerequisites() throws Exception {
setTestUserFs();
Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
isHNSEnabled);
Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false",
isCheckAccessEnabled);
setTestUserFs();
checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_ID);
checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_CLIENT_SECRET);
checkIfConfigIsSet(FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID);

View File

@ -20,19 +20,32 @@
import java.io.FileNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.mockito.Mockito;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
/**
* Test filesystem initialization and creation.
*/
@ -73,11 +86,11 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception {
TracingContext tracingContext = getSampleTracingContext(fs, true);
Mockito.doReturn(Mockito.mock(AbfsRestOperation.class))
.when(client)
.getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
.getAclStatus(Mockito.anyString(), any(TracingContext.class));
store.getIsNamespaceEnabled(tracingContext);
Mockito.verify(client, Mockito.times(1))
.getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
.getAclStatus(Mockito.anyString(), any(TracingContext.class));
}
@Test
@ -96,6 +109,31 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception {
store.getIsNamespaceEnabled(tracingContext);
Mockito.verify(client, Mockito.times(0))
.getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
.getAclStatus(Mockito.anyString(), any(TracingContext.class));
}
// Todo: [FnsOverBlob] Remove this test case once Blob Endpoint Support is ready and enabled.
@Test
public void testFileSystemInitFailsWithBlobEndpoitUrl() throws Exception {
Configuration configuration = getRawConfiguration();
String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY);
String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME);
intercept(InvalidConfigurationValueException.class,
"Blob Endpoint Support not yet available", () ->
FileSystem.newInstance(new Path(blobUri).toUri(), configuration));
}
@Test
public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exception {
AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
getRawConfiguration()));
AzureBlobFileSystem mockedFs = Mockito.spy(fs);
Mockito.doThrow(
new AbfsRestOperationException(HTTP_UNAVAILABLE, "Throttled",
"Throttled", null)).when(mockedFs).getIsNamespaceEnabled(any());
intercept(AzureBlobFileSystemException.class,
FS_AZURE_ACCOUNT_IS_HNS_ENABLED, () ->
mockedFs.initialize(fs.getUri(), getRawConfiguration()));
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -140,12 +141,15 @@ private String getNonExistingUrl() {
@Test
public void testFailedRequestWhenFSNotExist() throws Exception {
assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT);
AbfsConfiguration config = this.getConfiguration();
config.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, false);
String testUri = this.getTestUrl();
String nonExistingFsUrl = getAbfsScheme() + "://" + UUID.randomUUID()
+ testUri.substring(testUri.indexOf("@"));
config.setBoolean(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, isUsingXNSAccount);
AzureBlobFileSystem fs = this.getFileSystem(nonExistingFsUrl);
fs.getAbfsStore().setNamespaceEnabled(Trilean.UNKNOWN);
intercept(FileNotFoundException.class,
"\"The specified filesystem does not exist.\", 404",
@ -214,12 +218,14 @@ private void unsetConfAndEnsureGetAclCallIsMadeOnce() throws IOException {
private AbfsClient callAbfsGetIsNamespaceEnabledAndReturnMockAbfsClient()
throws IOException {
final AzureBlobFileSystem abfs = this.getFileSystem();
final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
final AbfsClient mockClient = mock(AbfsClient.class);
final AzureBlobFileSystem abfs = Mockito.spy(this.getFileSystem());
final AzureBlobFileSystemStore abfsStore = Mockito.spy(abfs.getAbfsStore());
final AbfsClient mockClient = mock(AbfsDfsClient.class);
doReturn(abfsStore).when(abfs).getAbfsStore();
doReturn(mockClient).when(abfsStore).getClient();
doReturn(mockClient).when(abfsStore).getClient(any());
doReturn(mock(AbfsRestOperation.class)).when(mockClient)
.getAclStatus(anyString(), any(TracingContext.class));
abfsStore.setClient(mockClient);
getIsNamespaceEnabled(abfs);
return mockClient;
}

View File

@ -160,7 +160,8 @@ private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws IOException, URISyntaxException {
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
// Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), null,
config, (AccessTokenProvider) null, null, abfsClientContext);
String sslProviderName = null;
if (includeSSLProvider) {
@ -363,7 +364,8 @@ public static AbfsClient createTestClientFromCurrentContext(
.build();
// Create test AbfsClient
AbfsClient testClient = new AbfsClient(
// Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient testClient = new AbfsDfsClient(
baseAbfsClientInstance.getBaseUrl(),
(currentAuthType == AuthType.SharedKey
? new SharedKeyCredentials(
@ -391,7 +393,8 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
(currentAuthType == AuthType.SharedKey)
|| (currentAuthType == AuthType.OAuth));
AbfsClient client = mock(AbfsClient.class);
// Todo : [FnsOverBlob] Update to work with Blob Endpoint as well when Fns Over Blob is ready.
AbfsClient client = mock(AbfsDfsClient.class);
AbfsPerfTracker tracker = new AbfsPerfTracker(
"test",
abfsConfig.getAccountName(),