HADOOP-18610: [ABFS] OAuth2 Token Provider support for Azure Workload Identity (#6787)
Add support for Azure Active Directory (Azure AD) workload identities which integrate with the Kubernetes's native capabilities to federate with any external identity provider. Contributed By: Anuj Modi
This commit is contained in:
parent
bb30545583
commit
005030f7a0
@ -59,6 +59,7 @@
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
|
||||
@ -983,6 +984,20 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
||||
tokenProvider = new RefreshTokenBasedTokenProvider(authEndpoint,
|
||||
clientId, refreshToken);
|
||||
LOG.trace("RefreshTokenBasedTokenProvider initialized");
|
||||
} else if (tokenProviderClass == WorkloadIdentityTokenProvider.class) {
|
||||
String authority = appendSlashIfNeeded(
|
||||
getTrimmedPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY,
|
||||
AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_MSI_AUTHORITY));
|
||||
String tenantGuid =
|
||||
getMandatoryPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT);
|
||||
String clientId =
|
||||
getMandatoryPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||
String tokenFile =
|
||||
getTrimmedPasswordString(FS_AZURE_ACCOUNT_OAUTH_TOKEN_FILE,
|
||||
AuthConfigurations.DEFAULT_FS_AZURE_ACCOUNT_OAUTH_TOKEN_FILE);
|
||||
tokenProvider = new WorkloadIdentityTokenProvider(
|
||||
authority, tenantGuid, clientId, tokenFile);
|
||||
LOG.trace("WorkloadIdentityTokenProvider initialized");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
|
||||
}
|
||||
|
@ -39,6 +39,10 @@ public final class AuthConfigurations {
|
||||
public static final String
|
||||
DEFAULT_FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT =
|
||||
"https://login.microsoftonline.com/Common/oauth2/token";
|
||||
/** Default OAuth token file path for the workload identity flow. */
|
||||
public static final String
|
||||
DEFAULT_FS_AZURE_ACCOUNT_OAUTH_TOKEN_FILE =
|
||||
"/var/run/secrets/azure/tokens/azure-identity-token";
|
||||
|
||||
private AuthConfigurations() {
|
||||
}
|
||||
|
@ -273,6 +273,8 @@ public final class ConfigurationKeys {
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token";
|
||||
/** Key for oauth AAD refresh token endpoint: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT = "fs.azure.account.oauth2.refresh.token.endpoint";
|
||||
/** Key for oauth AAD workload identity token file path: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_TOKEN_FILE = "fs.azure.account.oauth2.token.file";
|
||||
/** Key for enabling the tracking of ABFS API latency and sending the latency numbers to the ABFS API service */
|
||||
public static final String FS_AZURE_ABFS_LATENCY_TRACK = "fs.azure.abfs.latency.track";
|
||||
|
||||
|
@ -57,6 +57,9 @@ public final class AzureADAuthenticator {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class);
|
||||
private static final String RESOURCE_NAME = "https://storage.azure.com/";
|
||||
private static final String SCOPE = "https://storage.azure.com/.default";
|
||||
private static final String JWT_BEARER_ASSERTION = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer";
|
||||
private static final String CLIENT_CREDENTIALS = "client_credentials";
|
||||
private static final String OAUTH_VERSION_2_0 = "/oauth2/v2.0/";
|
||||
private static final int CONNECT_TIMEOUT = 30 * 1000;
|
||||
private static final int READ_TIMEOUT = 30 * 1000;
|
||||
|
||||
@ -95,15 +98,14 @@ public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
|
||||
Preconditions.checkNotNull(authEndpoint, "authEndpoint");
|
||||
Preconditions.checkNotNull(clientId, "clientId");
|
||||
Preconditions.checkNotNull(clientSecret, "clientSecret");
|
||||
boolean isVersion2AuthenticationEndpoint = authEndpoint.contains("/oauth2/v2.0/");
|
||||
|
||||
QueryParams qp = new QueryParams();
|
||||
if (isVersion2AuthenticationEndpoint) {
|
||||
if (isVersion2AuthenticationEndpoint(authEndpoint)) {
|
||||
qp.add("scope", SCOPE);
|
||||
} else {
|
||||
qp.add("resource", RESOURCE_NAME);
|
||||
}
|
||||
qp.add("grant_type", "client_credentials");
|
||||
qp.add("grant_type", CLIENT_CREDENTIALS);
|
||||
qp.add("client_id", clientId);
|
||||
qp.add("client_secret", clientSecret);
|
||||
LOG.debug("AADToken: starting to fetch token using client creds for client ID " + clientId);
|
||||
@ -111,6 +113,46 @@ public static AzureADToken getTokenUsingClientCreds(String authEndpoint,
|
||||
return getTokenCall(authEndpoint, qp.serialize(), null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets Azure Active Directory token using the user ID and a JWT assertion
|
||||
* generated by a federated authentication process.
|
||||
*
|
||||
* The federation process uses a feature from Azure Active Directory
|
||||
* called workload identity. A workload identity is an identity used
|
||||
* by a software workload (such as an application, service, script,
|
||||
* or container) to authenticate and access other services and resources.
|
||||
*
|
||||
*
|
||||
* @param authEndpoint the OAuth 2.0 token endpoint associated
|
||||
* with the user's directory (obtain from
|
||||
* Active Directory configuration)
|
||||
* @param clientId the client ID (GUID) of the client web app
|
||||
* obtained from Azure Active Directory configuration
|
||||
* @param clientAssertion the JWT assertion token
|
||||
* @return {@link AzureADToken} obtained using the creds
|
||||
* @throws IOException throws IOException if there is a failure in connecting to Azure AD
|
||||
*/
|
||||
public static AzureADToken getTokenUsingJWTAssertion(String authEndpoint,
|
||||
String clientId, String clientAssertion) throws IOException {
|
||||
Preconditions.checkNotNull(authEndpoint, "authEndpoint");
|
||||
Preconditions.checkNotNull(clientId, "clientId");
|
||||
Preconditions.checkNotNull(clientAssertion, "clientAssertion");
|
||||
|
||||
QueryParams qp = new QueryParams();
|
||||
if (isVersion2AuthenticationEndpoint(authEndpoint)) {
|
||||
qp.add("scope", SCOPE);
|
||||
} else {
|
||||
qp.add("resource", RESOURCE_NAME);
|
||||
}
|
||||
qp.add("grant_type", CLIENT_CREDENTIALS);
|
||||
qp.add("client_id", clientId);
|
||||
qp.add("client_assertion", clientAssertion);
|
||||
qp.add("client_assertion_type", JWT_BEARER_ASSERTION);
|
||||
LOG.debug("AADToken: starting to fetch token using client assertion for client ID " + clientId);
|
||||
|
||||
return getTokenCall(authEndpoint, qp.serialize(), null, "POST");
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets AAD token from the local virtual machine's VM extension. This only works on
|
||||
* an Azure VM with MSI extension
|
||||
@ -523,4 +565,8 @@ private static String consumeInputStream(InputStream inStream, int length) throw
|
||||
|
||||
return new String(b, 0, totalBytesRead, StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static boolean isVersion2AuthenticationEndpoint(String authEndpoint) {
|
||||
return authEndpoint.contains(OAUTH_VERSION_2_0);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,142 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
|
||||
/**
|
||||
* Provides tokens based on Azure AD Workload Identity.
|
||||
*/
|
||||
public class WorkloadIdentityTokenProvider extends AccessTokenProvider {
|
||||
|
||||
private static final String OAUTH2_TOKEN_PATH = "/oauth2/v2.0/token";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);
|
||||
private static final String EMPTY_TOKEN_FILE_ERROR = "Empty token file found at specified path: ";
|
||||
private static final String TOKEN_FILE_READ_ERROR = "Error reading token file at specified path: ";
|
||||
|
||||
private final String authEndpoint;
|
||||
private final String clientId;
|
||||
private final String tokenFile;
|
||||
private long tokenFetchTime = -1;
|
||||
|
||||
public WorkloadIdentityTokenProvider(final String authority, final String tenantId,
|
||||
final String clientId, final String tokenFile) {
|
||||
Preconditions.checkNotNull(authority, "authority");
|
||||
Preconditions.checkNotNull(tenantId, "tenantId");
|
||||
Preconditions.checkNotNull(clientId, "clientId");
|
||||
Preconditions.checkNotNull(tokenFile, "tokenFile");
|
||||
|
||||
this.authEndpoint = authority + tenantId + OAUTH2_TOKEN_PATH;
|
||||
this.clientId = clientId;
|
||||
this.tokenFile = tokenFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureADToken refreshToken() throws IOException {
|
||||
LOG.debug("AADToken: refreshing token from JWT Assertion");
|
||||
String clientAssertion = getClientAssertion();
|
||||
AzureADToken token = getTokenUsingJWTAssertion(clientAssertion);
|
||||
tokenFetchTime = System.currentTimeMillis();
|
||||
return token;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the token is about to expire as per base expiry logic.
|
||||
* Otherwise, expire if there is a clock skew issue in the system.
|
||||
*
|
||||
* @return true if the token is expiring in next 1 hour or if a token has
|
||||
* never been fetched
|
||||
*/
|
||||
@Override
|
||||
protected boolean isTokenAboutToExpire() {
|
||||
if (tokenFetchTime == -1 || super.isTokenAboutToExpire()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// In case of, any clock skew issues, refresh token.
|
||||
long elapsedTimeSinceLastTokenRefreshInMillis =
|
||||
System.currentTimeMillis() - tokenFetchTime;
|
||||
boolean expiring = elapsedTimeSinceLastTokenRefreshInMillis < 0;
|
||||
if (expiring) {
|
||||
// Clock Skew issue. Refresh token.
|
||||
LOG.debug("JWTToken: token renewing. Time elapsed since last token fetch:"
|
||||
+ " {} milliseconds", elapsedTimeSinceLastTokenRefreshInMillis);
|
||||
}
|
||||
|
||||
return expiring;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the client assertion from the token file.
|
||||
* The token file should contain the client assertion in JWT format.
|
||||
* It should be a String containing Base64Url encoded JSON Web Token (JWT).
|
||||
* See <a href="https://azure.github.io/azure-workload-identity/docs/faq.html#does-workload-identity-work-in-disconnected-environments">
|
||||
* Azure Workload Identity FAQ</a>.
|
||||
*
|
||||
* @return the client assertion.
|
||||
* @throws IOException if the token file is empty.
|
||||
*/
|
||||
private String getClientAssertion()
|
||||
throws IOException {
|
||||
String clientAssertion = "";
|
||||
try {
|
||||
File file = new File(tokenFile);
|
||||
clientAssertion = FileUtils.readFileToString(file, "UTF-8");
|
||||
} catch (Exception e) {
|
||||
throw new IOException(TOKEN_FILE_READ_ERROR + tokenFile, e);
|
||||
}
|
||||
if (Strings.isNullOrEmpty(clientAssertion)) {
|
||||
throw new IOException(EMPTY_TOKEN_FILE_ERROR + tokenFile);
|
||||
}
|
||||
return clientAssertion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Azure AD token from a client assertion in JWT format.
|
||||
* This method exists to make unit testing possible.
|
||||
*
|
||||
* @param clientAssertion the client assertion.
|
||||
* @return the Azure AD token.
|
||||
* @throws IOException if there is a failure in connecting to Azure AD.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
AzureADToken getTokenUsingJWTAssertion(String clientAssertion) throws IOException {
|
||||
return AzureADAuthenticator
|
||||
.getTokenUsingJWTAssertion(authEndpoint, clientId, clientAssertion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the last time the token was fetched from the token file.
|
||||
* This method exists to make unit testing possible.
|
||||
*
|
||||
* @return the time the token was last fetched.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
long getTokenFetchTime() {
|
||||
return tokenFetchTime;
|
||||
}
|
||||
}
|
@ -321,10 +321,9 @@ What can be changed is what secrets/credentials are used to authenticate the cal
|
||||
|
||||
The authentication mechanism is set in `fs.azure.account.auth.type` (or the
|
||||
account specific variant). The possible values are SharedKey, OAuth, Custom
|
||||
and SAS. For the various OAuth options use the config `fs.azure.account
|
||||
.oauth.provider.type`. Following are the implementations supported
|
||||
ClientCredsTokenProvider, UserPasswordTokenProvider, MsiTokenProvider and
|
||||
RefreshTokenBasedTokenProvider. An IllegalArgumentException is thrown if
|
||||
and SAS. For the various OAuth options use the config `fs.azure.account.oauth.provider.type`. Following are the implementations supported
|
||||
ClientCredsTokenProvider, UserPasswordTokenProvider, MsiTokenProvider,
|
||||
RefreshTokenBasedTokenProvider and WorkloadIdentityTokenProvider. An IllegalArgumentException is thrown if
|
||||
the specified provider type is not one of the supported.
|
||||
|
||||
All secrets can be stored in JCEKS files. These are encrypted and password
|
||||
@ -561,6 +560,54 @@ The Azure Portal/CLI is used to create the service identity.
|
||||
</property>
|
||||
```
|
||||
|
||||
### <a name="workload-identity"></a> Azure Workload Identity
|
||||
|
||||
[Azure Workload Identities](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview), formerly "Azure AD pod identity".
|
||||
|
||||
OAuth 2.0 tokens are written to a file that is only accessible
|
||||
from the executing pod (`/var/run/secrets/azure/tokens/azure-identity-token`).
|
||||
The issued credentials can be used to authenticate.
|
||||
|
||||
The Azure Portal/CLI is used to create the service identity.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.account.auth.type</name>
|
||||
<value>OAuth</value>
|
||||
<description>
|
||||
Use OAuth authentication
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>fs.azure.account.oauth.provider.type</name>
|
||||
<value>org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider</value>
|
||||
<description>
|
||||
Use Workload Identity for issuing OAuth tokens
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.msi.tenant</name>
|
||||
<value>${env.AZURE_TENANT_ID}</value>
|
||||
<description>
|
||||
Optional MSI Tenant ID
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.id</name>
|
||||
<value>${env.AZURE_CLIENT_ID}</value>
|
||||
<description>
|
||||
Optional Client ID
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.token.file</name>
|
||||
<value>${env.AZURE_FEDERATED_TOKEN_FILE}</value>
|
||||
<description>
|
||||
Token file path
|
||||
</description>
|
||||
</property>
|
||||
```
|
||||
|
||||
### Custom OAuth 2.0 Token Provider
|
||||
|
||||
A Custom OAuth 2.0 token provider supplies the ABFS connector with an OAuth 2.0
|
||||
|
@ -879,6 +879,42 @@ hierarchical namespace enabled, and set the following configuration settings:
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--2.5. If "WorkloadIdentityTokenProvider" is set as key provider, uncomment below and
|
||||
set tenant, client id and token file path.
|
||||
|
||||
All service principals must have federated identity credentials for Kubernetes.
|
||||
See Azure docs: https://learn.microsoft.com/en-us/azure/active-directory/workload-identities/workload-identity-federation-create-trust?pivots=identity-wif-apps-methods-azp#kubernetes
|
||||
|
||||
Retrieve the Azure identity token from kubernetes:
|
||||
1. Create AKS cluster with Workload Identity: https://learn.microsoft.com/en-us/azure/aks/workload-identity-deploy-cluster
|
||||
2. Create the pod:
|
||||
kubectl apply -f src/test/resources/workload-identity-pod.yaml
|
||||
3. After the pod is running, retrieve the identity token from the pod logs:
|
||||
kubectl logs pod/workload-identity
|
||||
4. Save the identity token to the token file path specified below.
|
||||
|
||||
The Azure identity token expires after 1 hour.
|
||||
-->
|
||||
<!--
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.msi.tenant.{ABFS_ACCOUNT_NAME}</name>
|
||||
<value>{tenantGuid}</value>
|
||||
<description>msi tenantGuid.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.id.{ABFS_ACCOUNT_NAME}</name>
|
||||
<value>{client id}</value>
|
||||
<description>AAD client id.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.azure.account.oauth2.client.token.file.{ABFS_ACCOUNT_NAME}</name>
|
||||
<value>{token file path}</value>
|
||||
<description>Azure identity token file path.</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>fs.azure.identity.transformer.enable.short.name</name>
|
||||
|
@ -566,16 +566,7 @@ protected void assumeValidAuthConfigsPresent() {
|
||||
currentAuthType == AuthType.SAS);
|
||||
if (currentAuthType == AuthType.SharedKey) {
|
||||
assumeValidTestConfigPresent(getRawConfiguration(), FS_AZURE_ACCOUNT_KEY);
|
||||
} else if (currentAuthType == AuthType.OAuth) {
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET);
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
||||
} else if (currentAuthType == AuthType.Custom) {
|
||||
} else {
|
||||
assumeValidTestConfigPresent(getRawConfiguration(),
|
||||
FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME);
|
||||
}
|
||||
|
@ -31,7 +31,6 @@
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.MockSASTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
@ -80,7 +79,7 @@ public void testSASTokenProviderInitializeException() throws Exception {
|
||||
testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, TEST_ERR_AUTHZ_CLASS);
|
||||
testConfig.set(MOCK_SASTOKENPROVIDER_FAIL_INIT, "true");
|
||||
|
||||
intercept(TokenAccessProviderException.class,
|
||||
intercept(SASTokenProviderException.class,
|
||||
()-> {
|
||||
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
|
||||
});
|
||||
|
@ -29,6 +29,10 @@
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
@ -40,6 +44,10 @@
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_USER_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -65,13 +73,38 @@ public class TestAccountConfiguration {
|
||||
private static final String TEST_OAUTH_ENDPOINT = "oauthEndpoint";
|
||||
private static final String TEST_CLIENT_ID = "clientId";
|
||||
private static final String TEST_CLIENT_SECRET = "clientSecret";
|
||||
private static final String TEST_USER_NAME = "userName";
|
||||
private static final String TEST_USER_PASSWORD = "userPassword";
|
||||
private static final String TEST_MSI_TENANT = "msiTenant";
|
||||
private static final String TEST_REFRESH_TOKEN = "refreshToken";
|
||||
|
||||
private static final List<String> CONFIG_KEYS =
|
||||
private static final List<String> CLIENT_CREDENTIAL_OAUTH_CONFIG_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT,
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID,
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET));
|
||||
|
||||
private static final List<String> USER_PASSWORD_OAUTH_CONFIG_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT,
|
||||
FS_AZURE_ACCOUNT_OAUTH_USER_NAME,
|
||||
FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD));
|
||||
|
||||
private static final List<String> MSI_TOKEN_OAUTH_CONFIG_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT,
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID));
|
||||
|
||||
private static final List<String> REFRESH_TOKEN_OAUTH_CONFIG_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN,
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID));
|
||||
|
||||
private static final List<String> WORKLOAD_IDENTITY_OAUTH_CONFIG_KEYS =
|
||||
Collections.unmodifiableList(Arrays.asList(
|
||||
FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT,
|
||||
FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID));
|
||||
|
||||
@Test
|
||||
public void testStringPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
@ -374,14 +407,24 @@ public void testAccessTokenProviderPrecedence()
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigPropNotFound() throws Throwable {
|
||||
public void testOAuthConfigPropNotFound() throws Throwable {
|
||||
testConfigPropNotFound(CLIENT_CREDENTIAL_OAUTH_CONFIG_KEYS, ClientCredsTokenProvider.class.getName());
|
||||
testConfigPropNotFound(USER_PASSWORD_OAUTH_CONFIG_KEYS, UserPasswordTokenProvider.class.getName());
|
||||
testConfigPropNotFound(MSI_TOKEN_OAUTH_CONFIG_KEYS, MsiTokenProvider.class.getName());
|
||||
testConfigPropNotFound(REFRESH_TOKEN_OAUTH_CONFIG_KEYS, RefreshTokenBasedTokenProvider.class.getName());
|
||||
testConfigPropNotFound(WORKLOAD_IDENTITY_OAUTH_CONFIG_KEYS, WorkloadIdentityTokenProvider.class.getName());
|
||||
|
||||
}
|
||||
|
||||
private void testConfigPropNotFound(List<String> configKeys,
|
||||
String tokenProviderClassName)throws Throwable {
|
||||
final String accountName = "account";
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName);
|
||||
|
||||
for (String key : CONFIG_KEYS) {
|
||||
setAuthConfig(abfsConf, true, AuthType.OAuth);
|
||||
for (String key : configKeys) {
|
||||
setAuthConfig(abfsConf, true, AuthType.OAuth, tokenProviderClassName);
|
||||
abfsConf.unset(key);
|
||||
abfsConf.unset(key + "." + accountName);
|
||||
testMissingConfigKey(abfsConf, key);
|
||||
@ -408,13 +451,13 @@ public void testGlobalAndAccountOAuthPrecedence(AbfsConfiguration abfsConf,
|
||||
if (globalAuthType == null) {
|
||||
unsetAuthConfig(abfsConf, false);
|
||||
} else {
|
||||
setAuthConfig(abfsConf, false, globalAuthType);
|
||||
setAuthConfig(abfsConf, false, globalAuthType, TEST_OAUTH_PROVIDER_CLASS_CONFIG);
|
||||
}
|
||||
|
||||
if (accountSpecificAuthType == null) {
|
||||
unsetAuthConfig(abfsConf, true);
|
||||
} else {
|
||||
setAuthConfig(abfsConf, true, accountSpecificAuthType);
|
||||
setAuthConfig(abfsConf, true, accountSpecificAuthType, TEST_OAUTH_PROVIDER_CLASS_CONFIG);
|
||||
}
|
||||
|
||||
// If account specific AuthType is present, precedence is always for it.
|
||||
@ -445,7 +488,7 @@ public void testGlobalAndAccountOAuthPrecedence(AbfsConfiguration abfsConf,
|
||||
|
||||
public void setAuthConfig(AbfsConfiguration abfsConf,
|
||||
boolean isAccountSetting,
|
||||
AuthType authType) {
|
||||
AuthType authType, String tokenProviderClassName) {
|
||||
final String accountNameSuffix = "." + abfsConf.getAccountName();
|
||||
String authKey = FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME
|
||||
+ (isAccountSetting ? accountNameSuffix : "");
|
||||
@ -456,8 +499,9 @@ public void setAuthConfig(AbfsConfiguration abfsConf,
|
||||
case OAuth:
|
||||
providerClassKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME
|
||||
+ (isAccountSetting ? accountNameSuffix : "");
|
||||
providerClassValue = TEST_OAUTH_PROVIDER_CLASS_CONFIG;
|
||||
providerClassValue = tokenProviderClassName;
|
||||
|
||||
setOAuthConfigs(abfsConf, isAccountSetting, tokenProviderClassName);
|
||||
abfsConf.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT
|
||||
+ ((isAccountSetting) ? accountNameSuffix : ""),
|
||||
TEST_OAUTH_ENDPOINT);
|
||||
@ -488,6 +532,45 @@ public void setAuthConfig(AbfsConfiguration abfsConf,
|
||||
abfsConf.set(providerClassKey, providerClassValue);
|
||||
}
|
||||
|
||||
private void setOAuthConfigs(AbfsConfiguration abfsConfig, boolean isAccountSettings, String tokenProviderClassName) {
|
||||
String accountNameSuffix = isAccountSettings ? ("." + abfsConfig.getAccountName()) : "";
|
||||
|
||||
if (tokenProviderClassName.equals(ClientCredsTokenProvider.class.getName())) {
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountNameSuffix,
|
||||
TEST_OAUTH_ENDPOINT);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountNameSuffix,
|
||||
TEST_CLIENT_ID);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountNameSuffix,
|
||||
TEST_CLIENT_SECRET);
|
||||
}
|
||||
if (tokenProviderClassName.equals(UserPasswordTokenProvider.class.getName())) {
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountNameSuffix,
|
||||
TEST_OAUTH_ENDPOINT);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountNameSuffix,
|
||||
TEST_USER_NAME);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountNameSuffix,
|
||||
TEST_USER_PASSWORD);
|
||||
}
|
||||
if (tokenProviderClassName.equals(MsiTokenProvider.class.getName())) {
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountNameSuffix,
|
||||
TEST_MSI_TENANT);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountNameSuffix,
|
||||
TEST_CLIENT_ID);
|
||||
}
|
||||
if (tokenProviderClassName.equals(RefreshTokenBasedTokenProvider.class.getName())) {
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountNameSuffix,
|
||||
TEST_REFRESH_TOKEN);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountNameSuffix,
|
||||
TEST_CLIENT_ID);
|
||||
}
|
||||
if (tokenProviderClassName.equals(WorkloadIdentityTokenProvider.class.getName())) {
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountNameSuffix,
|
||||
TEST_MSI_TENANT);
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountNameSuffix,
|
||||
TEST_CLIENT_ID);
|
||||
}
|
||||
}
|
||||
|
||||
private void unsetAuthConfig(AbfsConfiguration abfsConf, boolean isAccountSettings) {
|
||||
String accountNameSuffix =
|
||||
isAccountSettings ? ("." + abfsConf.getAccountName()) : "";
|
||||
@ -499,6 +582,10 @@ private void unsetAuthConfig(AbfsConfiguration abfsConf, boolean isAccountSettin
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountNameSuffix);
|
||||
abfsConf.unset(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountNameSuffix);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,144 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
|
||||
/**
|
||||
* Test the refresh logic of workload identity tokens.
|
||||
*/
|
||||
public class TestWorkloadIdentityTokenProvider extends AbstractAbfsTestWithTimeout {
|
||||
|
||||
private static final String AUTHORITY = "authority";
|
||||
private static final String TENANT_ID = "00000000-0000-0000-0000-000000000000";
|
||||
private static final String CLIENT_ID = "00000000-0000-0000-0000-000000000000";
|
||||
private static final String TOKEN_FILE = "/tmp/does_not_exist";
|
||||
private static final String CLIENT_ASSERTION = "dummy-client-assertion";
|
||||
private static final String TOKEN = "dummy-token";
|
||||
private static final long FEW_SECONDS = 5 * 1000;
|
||||
private static final long ONE_MINUTE = 60 * 1000;
|
||||
private static final long FIVE_MINUTES = 5 * ONE_MINUTE;
|
||||
|
||||
public TestWorkloadIdentityTokenProvider() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the token starts as expired.
|
||||
*/
|
||||
@Test
|
||||
public void testTokenStartsAsExpired() {
|
||||
WorkloadIdentityTokenProvider provider = new WorkloadIdentityTokenProvider(
|
||||
AUTHORITY, TENANT_ID, CLIENT_ID, TOKEN_FILE);
|
||||
|
||||
Assertions.assertThat(provider.isTokenAboutToExpire())
|
||||
.describedAs("Token should start as expired")
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTokenFetchAndExpiry() throws Exception{
|
||||
long startTime = System.currentTimeMillis();
|
||||
AzureADToken adToken = new AzureADToken();
|
||||
adToken.setAccessToken(TOKEN);
|
||||
adToken.setExpiry(new Date(System.currentTimeMillis() + FEW_SECONDS + FIVE_MINUTES));
|
||||
|
||||
File tokenFile = File.createTempFile(TOKEN_FILE, "txt");
|
||||
FileUtils.write(tokenFile, CLIENT_ASSERTION, StandardCharsets.UTF_8);
|
||||
|
||||
WorkloadIdentityTokenProvider mockedTokenProvider = Mockito.spy(
|
||||
new WorkloadIdentityTokenProvider(AUTHORITY, TENANT_ID, CLIENT_ID,
|
||||
tokenFile.getPath()));
|
||||
Mockito.doReturn(adToken).when(mockedTokenProvider).getTokenUsingJWTAssertion(CLIENT_ASSERTION);
|
||||
|
||||
// Token should be expired first and fetched
|
||||
Assertions.assertThat(mockedTokenProvider.isTokenAboutToExpire())
|
||||
.describedAs("Token should not be expired")
|
||||
.isTrue();
|
||||
Assertions.assertThat(mockedTokenProvider.getToken().getAccessToken())
|
||||
.describedAs("Token should be fetched")
|
||||
.isEqualTo(TOKEN);
|
||||
Assertions.assertThat(mockedTokenProvider.getTokenFetchTime())
|
||||
.describedAs("Token should not be expired")
|
||||
.isGreaterThan(startTime);
|
||||
|
||||
// Token should be valid for few seconds.
|
||||
Assertions.assertThat(mockedTokenProvider.isTokenAboutToExpire())
|
||||
.describedAs("Token should not be expired")
|
||||
.isFalse();
|
||||
|
||||
// Token should be expired after few seconds.
|
||||
Thread.sleep(FEW_SECONDS);
|
||||
Assertions.assertThat(mockedTokenProvider.isTokenAboutToExpire())
|
||||
.describedAs("Token should be expired")
|
||||
.isTrue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that an exception is thrown when the token file is empty.
|
||||
*
|
||||
* @throws IOException if file I/O fails.
|
||||
*/
|
||||
@Test
|
||||
public void testTokenFetchWithEmptyTokenFile() throws Exception {
|
||||
File tokenFile = File.createTempFile("azure-identity-token", "txt");
|
||||
AzureADToken azureAdToken = new AzureADToken();
|
||||
WorkloadIdentityTokenProvider tokenProvider = Mockito.spy(
|
||||
new WorkloadIdentityTokenProvider(AUTHORITY, TENANT_ID, CLIENT_ID, tokenFile.getPath()));
|
||||
Mockito.doReturn(azureAdToken)
|
||||
.when(tokenProvider).getTokenUsingJWTAssertion(TOKEN);
|
||||
IOException ex = intercept(IOException.class, () -> {
|
||||
tokenProvider.getToken();
|
||||
});
|
||||
Assertions.assertThat(ex.getMessage())
|
||||
.describedAs("Exception should be thrown when the token file is empty")
|
||||
.contains("Empty token file");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that an exception is thrown when the token file is not present.
|
||||
*
|
||||
* @throws IOException if file I/O fails.
|
||||
*/
|
||||
@Test
|
||||
public void testTokenFetchWithTokenFileNotFound() throws Exception {
|
||||
AzureADToken azureAdToken = new AzureADToken();
|
||||
WorkloadIdentityTokenProvider tokenProvider = Mockito.spy(
|
||||
new WorkloadIdentityTokenProvider(AUTHORITY, TENANT_ID, CLIENT_ID, TOKEN_FILE));
|
||||
Mockito.doReturn(azureAdToken)
|
||||
.when(tokenProvider).getTokenUsingJWTAssertion(TOKEN);
|
||||
IOException ex = intercept(IOException.class, () -> {
|
||||
tokenProvider.getToken();
|
||||
});
|
||||
Assertions.assertThat(ex.getMessage())
|
||||
.describedAs("Exception should be thrown when the token file not found")
|
||||
.contains("Error reading token file");
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
@ -0,0 +1,32 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: workload-identity
|
||||
# Namespace must match federated identity credential in Azure AD.
|
||||
namespace: default
|
||||
labels:
|
||||
azure.workload.identity/use: "true"
|
||||
spec:
|
||||
# Service account name must match federated identity credential in Azure AD.
|
||||
serviceAccountName: default
|
||||
containers:
|
||||
- name: busybox
|
||||
image: busybox
|
||||
args:
|
||||
- cat
|
||||
- /var/run/secrets/azure/tokens/azure-identity-token
|
||||
restartPolicy: Never
|
Loading…
Reference in New Issue
Block a user