diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index 2597f58b3a..e6641e351d 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -262,7 +262,22 @@ mockito-core test - + + org.apache.hadoop + hadoop-minikdc + test + + + + org.bouncycastle + bcprov-jdk15on + test + + + org.bouncycastle + bcpkix-jdk15on + test + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 67055c54fd..5c348b839a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -514,7 +514,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio } catch(IllegalArgumentException e) { throw e; } catch (Exception e) { - throw new TokenAccessProviderException("Unable to load custom token provider class.", e); + throw new TokenAccessProviderException("Unable to load custom token provider class: " + e, e); } } else { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index e321e9e88e..107465a1c3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -70,6 +70,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; @@ -121,6 +122,8 @@ public void initialize(URI uri, Configuration configuration) if (this.delegationTokenEnabled) { LOG.debug("Initializing DelegationTokenManager for {}", uri); this.delegationTokenManager = abfsConfiguration.getDelegationTokenManager(); + delegationTokenManager.bind(getUri(), configuration); + LOG.debug("Created DelegationTokenManager {}", delegationTokenManager); } } @@ -419,9 +422,10 @@ public synchronized void close() throws IOException { if (isClosed) { return; } - + // does all the delete-on-exit calls, and may be slow. super.close(); LOG.debug("AzureBlobFileSystem.close"); + IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager); this.isClosed = true; } @@ -1023,6 +1027,20 @@ public synchronized Token getDelegationToken(final String renewer) throws IOE : super.getDelegationToken(renewer); } + /** + * If Delegation tokens are enabled, the canonical service name of + * this filesystem is the filesystem URI. + * @return either the filesystem URI as a string, or null. + */ + @Override + public String getCanonicalServiceName() { + String name = null; + if (delegationTokenManager != null) { + name = delegationTokenManager.getCanonicalServiceName(); + } + return name != null ? name : super.getCanonicalServiceName(); + } + @VisibleForTesting FileSystem.Statistics getFsStatistics() { return this.statistics; @@ -1053,6 +1071,15 @@ AbfsClient getAbfsClient() { return abfsStore.getClient(); } + /** + * Get any Delegation Token manager created by the filesystem. + * @return the DT manager or null. + */ + @VisibleForTesting + AbfsDelegationTokenManager getDelegationTokenManager() { + return delegationTokenManager; + } + @VisibleForTesting boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException { return abfsStore.getIsNamespaceEnabled(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index dbf78ecb02..f8360432ad 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs.azurebfs; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -66,6 +67,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; @@ -84,6 +86,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; import org.slf4j.Logger; @@ -95,7 +98,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Evolving -public class AzureBlobFileSystemStore { +public class AzureBlobFileSystemStore implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class); private AbfsClient client; @@ -163,6 +166,11 @@ public String getPrimaryGroup() { return this.primaryUserGroup; } + @Override + public void close() throws IOException { + IOUtils.cleanupWithLogger(LOG, client); + } + private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException { final String authority = uri.getRawAuthority(); if (null == authority) { @@ -788,7 +796,8 @@ public boolean isAtomicRenameKey(String key) { return isKeyForDirectorySet(key, azureAtomicRenameDirSet); } - private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException { + private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) + throws IOException { if (this.client != null) { return; } @@ -817,6 +826,8 @@ private void initializeClient(URI uri, String fileSystemName, String accountName abfsConfiguration.getStorageAccountKey()); } else { tokenProvider = abfsConfiguration.getTokenProvider(); + ExtensionHelper.bind(tokenProvider, uri, + abfsConfiguration.getRawConfiguration()); } this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java index 36f7589f16..73b98942d0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsRestOperationException.java @@ -53,7 +53,7 @@ public AbfsRestOperationException( final String errorMessage, final Exception innerException, final AbfsHttpOperation abfsHttpOperation) { - super(formatMessage(abfsHttpOperation)); + super(formatMessage(abfsHttpOperation), innerException); this.statusCode = statusCode; this.errorCode = AzureServiceErrorCode.getAzureServiceCode(this.statusCode, errorCode); @@ -61,7 +61,7 @@ public AbfsRestOperationException( } public AbfsRestOperationException(final HttpException innerException) { - super(innerException.getMessage()); + super(innerException.getMessage(), innerException); this.statusCode = innerException.getHttpErrorCode(); this.errorCode = AzureServiceErrorCode.UNKNOWN; @@ -100,4 +100,4 @@ private static String formatMessage(final AbfsHttpOperation abfsHttpOperation) { // Remove break line to ensure the request id and timestamp can be shown in console. abfsHttpOperation.getStorageErrorMessage().replaceAll("\\n", " ")); } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java index b40b34ac13..50dcc534aa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java @@ -32,5 +32,6 @@ public TokenAccessProviderException(String message) { public TokenAccessProviderException(String message, Throwable cause) { super(message); + initCause(cause); } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/BoundDTExtension.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/BoundDTExtension.java new file mode 100644 index 0000000000..908d8c5ccc --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/BoundDTExtension.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + * An optional extension for custom extensions, so as to support + * tighter integration. + * + * This interface can be implemented by either of a + * {@link CustomDelegationTokenManager} or a {@link CustomTokenProviderAdaptee}. + * + * In both cases, extra lifecycle operation will be invoked. + * + *
    + *
  1. {@link #bind(URI, Configuration)} will + * be invoked after {@code initialize()}
  2. + *
  3. {@link Closeable#close()} will be invoked + * when the Filesystem instance is closed.
  4. + *
+ * + * The {@link #getCanonicalServiceName()} will be invoked on a Custom + * DT provider when the filesystem is asked for a Canonical Service Name. + * + * The {@link #getUserAgentSuffix()} is invoked on a CustomTokenProviderAdaptee + * as the filesystem is initialized; the User Agent Suffix which it returns + * is included in the UA header used for the ABFS Client -and so logged + * in the ABFS access logs. + * + * This allows for token providers to to provide extra information + * about the caller for use in auditing requests. + */ +@InterfaceAudience.LimitedPrivate("authorization-subsystems") +@InterfaceStability.Unstable +public interface BoundDTExtension extends Closeable { + + /** + * Bind the extension to the specific instance of ABFS. + * This happens during the ABFS's own initialization logic; it is unlikely + * to be completely instantiated at this point. + * Therefore, while a reference may be cached, implementations MUST NOT + * invoke methods on it. + * @param fsURI URI of the filesystem. + * @param conf configuration of this extension. + * @throws IOException failure during binding. + */ + void bind(URI fsURI, Configuration conf) throws IOException; + + /** + * Get the canonical service name, which will be + * returned by {@code FileSystem.getCanonicalServiceName()} and so used to + * map the issued DT in credentials, including credential files collected + * for job submission. + * + * If null is returned: fall back to the default filesystem logic. + * + * Only invoked on {@link CustomDelegationTokenManager} instances. + * @return the service name to be returned by the filesystem. + */ + default String getCanonicalServiceName() { + return null; + } + + /** + * Get a suffix for the UserAgent suffix of HTTP requests, which + * can be used to identify the principal making ABFS requests. + * @return an empty string, or a key=value string to be added to the UA + * header. + */ + default String getUserAgentSuffix() { + return ""; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/CustomDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/CustomDelegationTokenManager.java index 422f8c2511..22edb04ef2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/CustomDelegationTokenManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/CustomDelegationTokenManager.java @@ -29,6 +29,13 @@ /** * Interface for Managing the Delegation tokens. + * + * Implementations which also implement BoundDTExtension will have + * the its {@code bind()} called + * after {@code initialize)} and before any calls to + * {@link #getDelegationToken(String)}. + * It will not be bound during token renew or cancel operations: there is + * no Filesystem to bind to in those operations. */ @InterfaceAudience.LimitedPrivate("authorization-subsystems") @InterfaceStability.Unstable diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/ExtensionHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/ExtensionHelper.java new file mode 100644 index 0000000000..725c7b2257 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/ExtensionHelper.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.IOException; +import java.net.URI; +import java.util.Optional; +import java.util.function.Function; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; + +/** + * Classes to help with use of extensions, expecially those + * implementing @{@link BoundDTExtension}. + */ +@InterfaceAudience.LimitedPrivate("authorization-subsystems") +@InterfaceStability.Unstable +public final class ExtensionHelper { + + private ExtensionHelper() { + } + + /** + * If the passed in extension class implements {@link BoundDTExtension} + * then it will have its {@link BoundDTExtension#bind(URI, Configuration)} + * method called. + * @param extension extension to examine and maybe invoke + * @param uri URI of the filesystem. + * @param conf configuration of this extension. + * @throws IOException failure during binding. + */ + public static void bind(Object extension, URI uri, Configuration conf) + throws IOException { + if (extension instanceof BoundDTExtension) { + ((BoundDTExtension) extension).bind(uri, conf); + } + } + + /** + * Close an extension if it is closeable. + * Any error raised is caught and logged. + * @param extension extension instance. + */ + public static void close(Object extension) { + ifBoundDTExtension(extension, + v -> { + IOUtils.closeStreams(v); + return null; + }); + } + + /** + * Invoke {@link BoundDTExtension#getUserAgentSuffix()} or + * return the default value. + * @param extension extension to invoke + * @param def default if the class is of the wrong type. + * @return a user agent suffix + */ + public static String getUserAgentSuffix(Object extension, String def) { + return ifBoundDTExtension(extension, BoundDTExtension::getUserAgentSuffix) + .orElse(def); + } + + /** + * Invoke {@link BoundDTExtension#getCanonicalServiceName()} or + * return the default value. + * @param extension extension to invoke + * @param def default if the class is of the wrong type. + * @return a canonical service name. + */ + public static String getCanonicalServiceName(Object extension, String def) { + return ifBoundDTExtension(extension, BoundDTExtension::getCanonicalServiceName) + .orElse(def); + } + + /** + * Invoke an operation on an object if it implements the BoundDTExtension + * interface; returns an optional value. + * @param extension the extension to invoke. + * @param fn function to apply + * @param return type of te function. + * @return an optional value which, if not empty, contains the return value + * of the invoked function. If empty: the object was not of a compatible + * type. + */ + public static Optional ifBoundDTExtension(Object extension, + Function fn) { + if (extension instanceof BoundDTExtension) { + return Optional.of((BoundDTExtension) extension).map(fn); + } else { + return Optional.empty(); + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java index df7b1990f3..39fba83f8a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.services.AbfsIoUtils; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; /** @@ -165,8 +166,14 @@ public static AzureADToken getTokenUsingRefreshToken(String clientId, * failed to get the Azure Active Directory token. */ public static class HttpException extends IOException { - private int httpErrorCode; - private String requestId; + private final int httpErrorCode; + private final String requestId; + + private final String url; + + private final String contentType; + + private final String body; /** * Gets Http error status code. @@ -184,11 +191,63 @@ public String getRequestId() { return this.requestId; } - HttpException(int httpErrorCode, String requestId, String message) { + HttpException( + final int httpErrorCode, + final String requestId, + final String message, + final String url, + final String contentType, + final String body) { super(message); this.httpErrorCode = httpErrorCode; this.requestId = requestId; + this.url = url; + this.contentType = contentType; + this.body = body; } + + public String getUrl() { + return url; + } + + public String getContentType() { + return contentType; + } + + public String getBody() { + return body; + } + + @Override + public String getMessage() { + final StringBuilder sb = new StringBuilder(); + sb.append("HTTP Error "); + sb.append(httpErrorCode); + sb.append("; url='").append(url).append('\''); + sb.append(' '); + sb.append(super.getMessage()); + sb.append("; requestId='").append(requestId).append('\''); + sb.append("; contentType='").append(contentType).append('\''); + sb.append("; response '").append(body).append('\''); + return sb.toString(); + } + } + + /** + * An unexpected HTTP response was raised, such as text coming back + * from what should be an OAuth endpoint. + */ + public static class UnexpectedResponseException extends HttpException { + + public UnexpectedResponseException(final int httpErrorCode, + final String requestId, + final String message, + final String url, + final String contentType, + final String body) { + super(httpErrorCode, requestId, message, url, contentType, body); + } + } private static AzureADToken getTokenCall(String authEndpoint, String body, @@ -236,6 +295,8 @@ private static AzureADToken getTokenSingleCall( } try { + LOG.debug("Requesting an OAuth token by {} to {}", + httpMethod, authEndpoint); URL url = new URL(urlString); conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(httpMethod); @@ -248,13 +309,18 @@ private static AzureADToken getTokenSingleCall( } } conn.setRequestProperty("Connection", "close"); - + AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", + conn.getRequestProperties()); if (httpMethod.equals("POST")) { conn.setDoOutput(true); conn.getOutputStream().write(payload.getBytes("UTF-8")); } int httpResponseCode = conn.getResponseCode(); + LOG.debug("Response {}", httpResponseCode); + AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", + conn.getHeaderFields()); + String requestId = conn.getHeaderField("x-ms-request-id"); String responseContentType = conn.getHeaderField("Content-Type"); long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0); @@ -265,23 +331,49 @@ private static AzureADToken getTokenSingleCall( InputStream httpResponseStream = conn.getInputStream(); token = parseTokenFromStream(httpResponseStream); } else { - String responseBody = consumeInputStream(conn.getErrorStream(), 1024); + InputStream stream = conn.getErrorStream(); + if (stream == null) { + // no error stream, try the original input stream + stream = conn.getInputStream(); + } + String responseBody = consumeInputStream(stream, 1024); String proxies = "none"; String httpProxy = System.getProperty("http.proxy"); String httpsProxy = System.getProperty("https.proxy"); if (httpProxy != null || httpsProxy != null) { proxies = "http:" + httpProxy + "; https:" + httpsProxy; } - String logMessage = - "AADToken: HTTP connection failed for getting token from AzureAD. Http response: " - + httpResponseCode + " " + conn.getResponseMessage() - + "\nContent-Type: " + responseContentType - + " Content-Length: " + responseContentLength - + " Request ID: " + requestId.toString() + String operation = "AADToken: HTTP connection to " + authEndpoint + + " failed for getting token from AzureAD."; + String logMessage = operation + + " HTTP response: " + httpResponseCode + + " " + conn.getResponseMessage() + " Proxies: " + proxies - + "\nFirst 1K of Body: " + responseBody; + + (responseBody.isEmpty() + ? "" + : ("\nFirst 1K of Body: " + responseBody)); LOG.debug(logMessage); - throw new HttpException(httpResponseCode, requestId, logMessage); + if (httpResponseCode == HttpURLConnection.HTTP_OK) { + // 200 is returned by some of the sign-on pages, but can also + // come from proxies, utterly wrong URLs, etc. + throw new UnexpectedResponseException(httpResponseCode, + requestId, + operation + + " Unexpected response." + + " Check configuration, URLs and proxy settings." + + " proxies=" + proxies, + authEndpoint, + responseContentType, + responseBody); + } else { + // general HTTP error + throw new HttpException(httpResponseCode, + requestId, + operation, + authEndpoint, + responseContentType, + responseBody); + } } } finally { if (conn != null) { @@ -330,6 +422,10 @@ private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) } private static String consumeInputStream(InputStream inStream, int length) throws IOException { + if (inStream == null) { + // the HTTP request returned an empty body + return ""; + } byte[] b = new byte[length]; int totalBytesRead = 0; int bytesRead = 0; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java index 6e9f6350a1..37cfa6f1d2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java @@ -20,18 +20,23 @@ import java.io.IOException; +import java.net.URI; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension; import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee; +import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; /** * Provides tokens based on custom implementation, following the Adapter Design * Pattern. */ -public final class CustomTokenProviderAdapter extends AccessTokenProvider { +public final class CustomTokenProviderAdapter extends AccessTokenProvider + implements BoundDTExtension { private CustomTokenProviderAdaptee adaptee; private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); @@ -55,4 +60,40 @@ protected AzureADToken refreshToken() throws IOException { return azureADToken; } -} \ No newline at end of file + + /** + * Bind to the filesystem by passing the binding call on + * to any custom token provider adaptee which implements + * {@link BoundDTExtension}. + * No-op if they don't. + * @param fsURI URI of the filesystem. + * @param conf configuration of this extension. + * @throws IOException failure. + */ + @Override + public void bind(final URI fsURI, + final Configuration conf) + throws IOException { + ExtensionHelper.bind(adaptee, fsURI, conf); + } + + @Override + public void close() { + ExtensionHelper.close(adaptee); + } + + /** + * Get a suffix for the UserAgent suffix of HTTP requests, which + * can be used to identify the principal making ABFS requests. + * + * If the adaptee is a BoundDTExtension, it is queried for a UA Suffix; + * otherwise "" is returned. + * + * @return an empty string, or a key=value string to be added to the UA + * header. + */ + public String getUserAgentSuffix() { + String suffix = ExtensionHelper.getUserAgentSuffix(adaptee, ""); + return suffix != null ? suffix : ""; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java index 390c2f4031..7272c13297 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java @@ -24,8 +24,17 @@ /** * Delegation token Identifier for ABFS delegation tokens. + * The token kind from {@link #getKind()} is {@link #TOKEN_KIND}, always. + * + * Subclasses have to very careful when looking up tokens (which will of + * course be registered in the credentials as of this kind), in case the + * incoming credentials are actually of a different subtype. */ public class AbfsDelegationTokenIdentifier extends DelegationTokenIdentifier { + + /** + * The token kind of these tokens: ""ABFS delegation". + */ public static final Text TOKEN_KIND = new Text("ABFS delegation"); public AbfsDelegationTokenIdentifier(){ @@ -41,6 +50,13 @@ public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer, super(kind, owner, renewer, realUser); } + /** + * Get the token kind. + * Returns {@link #TOKEN_KIND} always. + * If a subclass does not want its renew/cancel process to be managed + * by {@link AbfsDelegationTokenManager}, this must be overridden. + * @return the kind of the token. + */ @Override public Text getKind() { return TOKEN_KIND; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java index eb47f768a4..c8d6b803f4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java @@ -19,28 +19,45 @@ package org.apache.hadoop.fs.azurebfs.security; +import java.io.Closeable; import java.io.IOException; +import java.net.URI; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension; import org.apache.hadoop.fs.azurebfs.extensions.CustomDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; /** * Class for delegation token Manager. + * + * Instantiates the class declared in + * {@link ConfigurationKeys#FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE} and + * issues tokens from it. */ -public class AbfsDelegationTokenManager { +public class AbfsDelegationTokenManager implements BoundDTExtension { private CustomDelegationTokenManager tokenManager; private static final Logger LOG = LoggerFactory.getLogger(AbfsDelegationTokenManager.class); + /** + * Create the custom delegation token manager and call its + * {@link CustomDelegationTokenManager#initialize(Configuration)} method. + * @param conf configuration + * @throws IOException failure during initialization. + * @throws RuntimeException classloading problems. + */ public AbfsDelegationTokenManager(final Configuration conf) throws IOException { Preconditions.checkNotNull(conf, "conf"); @@ -54,23 +71,75 @@ public AbfsDelegationTokenManager(final Configuration conf) throws IOException { "The value for \"fs.azure.delegation.token.provider.type\" is not defined."); } - CustomDelegationTokenManager customTokenMgr = (CustomDelegationTokenManager) ReflectionUtils + CustomDelegationTokenManager customTokenMgr = ReflectionUtils .newInstance(customDelegationTokenMgrClass, conf); - if (customTokenMgr == null) { - throw new IllegalArgumentException(String.format("Failed to initialize %s.", customDelegationTokenMgrClass)); - } - + Preconditions.checkArgument(customTokenMgr != null, + "Failed to initialize %s.", customDelegationTokenMgrClass); customTokenMgr.initialize(conf); - tokenManager = customTokenMgr; } + /** + * Bind to a filesystem instance by passing the binding information down + * to any token manager which implements {@link BoundDTExtension}. + * + * This is not invoked before renew or cancel operations, but is guaranteed + * to be invoked before calls to {@link #getDelegationToken(String)}. + * @param fsURI URI of the filesystem. + * @param conf configuration of this extension. + * @throws IOException bind failure. + */ + @Override + public void bind(final URI fsURI, final Configuration conf) + throws IOException { + Preconditions.checkNotNull(fsURI, "Np Filesystem URI"); + ExtensionHelper.bind(tokenManager, fsURI, conf); + } + + /** + * Query the token manager for the service name; if it does not implement + * the extension interface, null is returned. + * @return the canonical service name. + */ + @Override + public String getCanonicalServiceName() { + return ExtensionHelper.getCanonicalServiceName(tokenManager, null); + } + + /** + * Close. + * If the token manager is closeable, it has its {@link Closeable#close()} + * method (quietly) invoked. + */ + @Override + public void close() { + if (tokenManager instanceof Closeable) { + IOUtils.cleanupWithLogger(LOG, (Closeable) tokenManager); + } + } + + /** + * Get a delegation token by invoking + * {@link CustomDelegationTokenManager#getDelegationToken(String)}. + * If the token returned already has a Kind; that is used. + * If not, then the token kind is set to + * {@link AbfsDelegationTokenIdentifier#TOKEN_KIND}, which implicitly + * resets any token renewer class. + * @param renewer the principal permitted to renew the token. + * @return a token for the filesystem. + * @throws IOException failure. + */ public Token getDelegationToken( String renewer) throws IOException { + LOG.debug("Requesting Delegation token for {}", renewer); Token token = tokenManager.getDelegationToken(renewer); - token.setKind(AbfsDelegationTokenIdentifier.TOKEN_KIND); + if (token.getKind() == null) { + // if a token type is not set, use the default. + // note: this also sets the renewer to null. + token.setKind(AbfsDelegationTokenIdentifier.TOKEN_KIND); + } return token; } @@ -85,4 +154,18 @@ public void cancelDelegationToken(Token token) tokenManager.cancelDelegationToken(token); } + + @VisibleForTesting + public CustomDelegationTokenManager getTokenManager() { + return tokenManager; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbfsDelegationTokenManager{"); + sb.append("tokenManager=").append(tokenManager); + sb.append('}'); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDtFetcher.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDtFetcher.java new file mode 100644 index 0000000000..db3e866da0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDtFetcher.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.security; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.DtFetcher; +import org.apache.hadoop.security.token.Token; + +/** + * A DT fetcher for Abfs. + * This is a copy-and-paste of + * {@code org.apache.hadoop.hdfs.HdfsDtFetcher}. + * + * It is needed for the `hadoop dtutil` command. + */ +public class AbfsDtFetcher implements DtFetcher { + + private static final String FETCH_FAILED = + "Filesystem not generating Delegation Tokens"; + + /** + * Returns the service name for the scheme.. + */ + public Text getServiceName() { + return new Text(getScheme()); + } + + /** + * Get the scheme for this specific fetcher. + * @return a scheme. + */ + protected String getScheme() { + return FileSystemUriSchemes.ABFS_SCHEME; + } + + public boolean isTokenRequired() { + return UserGroupInformation.isSecurityEnabled(); + } + + /** + * Returns Token object via FileSystem, null if bad argument. + * @param conf - a Configuration object used with FileSystem.get() + * @param creds - a Credentials object to which token(s) will be added + * @param renewer - the renewer to send with the token request + * @param url - the URL to which the request is sent + * @return a Token, or null if fetch fails. + */ + public Token addDelegationTokens(Configuration conf, + Credentials creds, + String renewer, + String url) throws Exception { + if (!url.startsWith(getServiceName().toString())) { + url = getServiceName().toString() + "://" + url; + } + FileSystem fs = FileSystem.get(URI.create(url), conf); + Token token = fs.getDelegationToken(renewer); + if (token == null) { + throw new IOException(FETCH_FAILED + ": " + url); + } + creds.addToken(token.getService(), token); + return token; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsTokenRenewer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsTokenRenewer.java index ab51838f26..2315b3ec70 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsTokenRenewer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsTokenRenewer.java @@ -30,6 +30,8 @@ /** * Token Renewer for renewing ABFS delegation tokens with remote service. + * + * Handles tokens of kind {@link AbfsDelegationTokenIdentifier#TOKEN_KIND}. */ public class AbfsTokenRenewer extends TokenRenewer { public static final Logger LOG = @@ -93,4 +95,4 @@ private AbfsDelegationTokenManager getInstance(Configuration conf) throws IOException { return new AbfsDelegationTokenManager(conf); } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfssDtFetcher.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfssDtFetcher.java new file mode 100644 index 0000000000..b74c02b67c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfssDtFetcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.security; + +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; + +/** + * The DT Fetcher for abfss. + */ +public class AbfssDtFetcher extends AbfsDtFetcher { + + /** + * Get the scheme for this specific fetcher. + * @return a scheme. + */ + protected String getScheme() { + return FileSystemUriSchemes.ABFS_SECURE_SCHEME; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/package-info.java index 7c3e37ae6e..36a87d6068 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/package-info.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/package-info.java @@ -16,7 +16,7 @@ * limitations under the License. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("authorization-subsystems") @InterfaceStability.Unstable package org.apache.hadoop.fs.azurebfs.security; import org.apache.hadoop.classification.InterfaceAudience; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 180870c042..ea665bd1f6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; @@ -37,8 +38,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; +import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; @@ -48,7 +51,7 @@ /** * AbfsClient. */ -public class AbfsClient { +public class AbfsClient implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; @@ -87,6 +90,13 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent this.tokenProvider = tokenProvider; } + @Override + public void close() throws IOException { + if (tokenProvider instanceof Closeable) { + IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); + } + } + public String getFileSystem() { return filesystem; } @@ -571,6 +581,11 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration, sb.append("; "); sb.append(sslProviderName); } + String tokenProviderField = + ExtensionHelper.getUserAgentSuffix(tokenProvider, ""); + if (!tokenProviderField.isEmpty()) { + sb.append("; ").append(tokenProviderField); + } sb.append(")"); final String userAgentComment = sb.toString(); String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix(); @@ -578,7 +593,7 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration, return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId); } - return String.format(CLIENT_VERSION + " %s", userAgentComment); + return String.format(Locale.ROOT, CLIENT_VERSION + " %s", userAgentComment); } @VisibleForTesting diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index de38b34724..78e1afd6b7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -268,6 +268,9 @@ public void processResponse(final byte[] buffer, final int offset, final int len if (this.requestId == null) { this.requestId = AbfsHttpConstants.EMPTY_STRING; } + // dump the headers + AbfsIoUtils.dumpHeadersToDebugLog("Response Headers", + connection.getHeaderFields()); if (AbfsHttpConstants.HTTP_METHOD_HEAD.equals(this.method)) { // If it is HEAD, and it is ERROR diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java new file mode 100644 index 0000000000..be2dcc54ed --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; + +/** + * Utility classes to work with the remote store. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class AbfsIoUtils { + + private static final Logger LOG = LoggerFactory.getLogger(AbfsIoUtils.class); + + private AbfsIoUtils() { + } + + /** + * Dump the headers of a request/response to the log at DEBUG level. + * @param origin header origin for log + * @param headers map of headers. + */ + public static void dumpHeadersToDebugLog(final String origin, + final Map> headers) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}", origin); + for (Map.Entry> entry : headers.entrySet()) { + String key = entry.getKey(); + if (key == null) { + key = "HTTP Response"; + } + String values = StringUtils.join(";", entry.getValue()); + if (key.contains("Cookie")) { + values = "*cookie info*"; + } + LOG.debug(" {}={}", + key, + values); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index fa8f742cfd..4196c10ed4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -149,15 +149,19 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS // sign the HTTP request if (client.getAccessToken() == null) { + LOG.debug("Signing request with shared key"); // sign the HTTP request client.getSharedKeyCredentials().signRequest( httpOperation.getConnection(), hasRequestBody ? bufferLength : 0); } else { + LOG.debug("Authenticating request with OAuth2 access token"); httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, client.getAccessToken()); } - + // dump the headers + AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", + httpOperation.getConnection().getRequestProperties()); AbfsClientThrottlingIntercept.sendingRequest(operationType); if (hasRequestBody) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java index 9ab9e50450..5f54673d7a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java @@ -39,6 +39,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.utils.Base64; @@ -48,6 +51,9 @@ * account. */ public class SharedKeyCredentials { + + private static final Logger LOG = LoggerFactory.getLogger( + SharedKeyCredentials.class); private static final int EXPECTED_BLOB_QUEUE_CANONICALIZED_STRING_LENGTH = 300; private static final Pattern CRLF = Pattern.compile("\r\n", Pattern.LITERAL); private static final String HMAC_SHA256 = "HmacSHA256"; @@ -76,14 +82,19 @@ public SharedKeyCredentials(final String accountName, public void signRequest(HttpURLConnection connection, final long contentLength) throws UnsupportedEncodingException { - connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, getGMTTime()); + String gmtTime = getGMTTime(); + connection.setRequestProperty(HttpHeaderConfigurations.X_MS_DATE, gmtTime); final String stringToSign = canonicalize(connection, accountName, contentLength); final String computedBase64Signature = computeHmac256(stringToSign); + String signature = String.format("%s %s:%s", "SharedKey", accountName, + computedBase64Signature); connection.setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, - String.format("%s %s:%s", "SharedKey", accountName, computedBase64Signature)); + signature); + LOG.debug("Signing request with timestamp of {} and signature {}", + gmtTime, signature); } private String computeHmac256(final String stringToSign) { diff --git a/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher new file mode 100644 index 0000000000..633df7eff4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/resources/META-INF/services/org.apache.hadoop.security.token.DtFetcher @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.hadoop.fs.azurebfs.security.AbfsDtFetcher +org.apache.hadoop.fs.azurebfs.security.AbfssDtFetcher diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index db55e67b76..02a62c855f 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -16,67 +16,780 @@ -## Introduction +## Introduction The `hadoop-azure` module provides support for the Azure Data Lake Storage Gen2 storage layer through the "abfs" connector -To make it part of Apache Hadoop's default classpath, simply make sure that -`HADOOP_OPTIONAL_TOOLS` in `hadoop-env.sh` has `hadoop-azure` in the list. +To make it part of Apache Hadoop's default classpath, make sure that +`HADOOP_OPTIONAL_TOOLS` environment variable has `hadoop-azure` in the list, +*on every machine in the cluster* -## Features +```bash +export HADOOP_OPTIONAL_TOOLS=hadoop-azure +``` -* Read and write data stored in an Azure Blob Storage account. +You can set this locally in your `.profile`/`.bashrc`, but note it won't +propagate to jobs running in-cluster. + + +## Features of the ABFS connector. + +* Supports reading and writing data stored in an Azure Blob Storage account. * *Fully Consistent* view of the storage across all clients. -* Can read data written through the wasb: connector. -* Present a hierarchical file system view by implementing the standard Hadoop +* Can read data written through the `wasb:` connector. +* Presents a hierarchical file system view by implementing the standard Hadoop [`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface. * Supports configuration of multiple Azure Blob Storage accounts. -* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark -* Tested at scale on both Linux and Windows. +* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark. +* Tested at scale on both Linux and Windows by Microsoft themselves. * Can be used as a replacement for HDFS on Hadoop clusters deployed in Azure infrastructure. - - -## Limitations - -* File last access time is not tracked. - - -## Technical notes - -### Security - -### Consistency and Concurrency - -*TODO*: complete/review - -The abfs client has a fully consistent view of the store, which has complete Create Read Update and Delete consistency for data and metadata. -(Compare and contrast with S3 which only offers Create consistency; S3Guard adds CRUD to metadata, but not the underlying data). - -### Performance - -*TODO*: check these. - -* File Rename: `O(1)`. -* Directory Rename: `O(files)`. -* Directory Delete: `O(files)`. - -## Configuring ABFS - -Any configuration can be specified generally (or as the default when accessing all accounts) or can be tied to s a specific account. -For example, an OAuth identity can be configured for use regardless of which account is accessed with the property -"fs.azure.account.oauth2.client.id" -or you can configure an identity to be used only for a specific storage account with -"fs.azure.account.oauth2.client.id.\.dfs.core.windows.net". - -Note that it doesn't make sense to do this with some properties, like shared keys that are inherently account-specific. - -## Testing ABFS - -See the relevant section in [Testing Azure](testing_azure.html). - -## References +For details on ABFS, consult the following documents: * [A closer look at Azure Data Lake Storage Gen2](https://azure.microsoft.com/en-gb/blog/a-closer-look-at-azure-data-lake-storage-gen2/); MSDN Article from June 28, 2018. +* [Storage Tiers](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-storage-tiers) + +## Getting started + +### Concepts + +The Azure Storage data model presents 3 core concepts: + +* **Storage Account**: All access is done through a storage account. +* **Container**: A container is a grouping of multiple blobs. A storage account + may have multiple containers. In Hadoop, an entire file system hierarchy is + stored in a single container. +* **Blob**: A file of any type and size stored with the existing wasb connector + +The ABFS connector connects to classic containers, or those created +with Hierarchical Namespaces. + +## Hierarchical Namespaces (and WASB Compatibility) + +A key aspect of ADLS Gen 2 is its support for +[hierachical namespaces](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-namespace) +These are effectively directories and offer high performance rename and delete operations +—something which makes a significant improvement in performance in query engines +writing data to, including MapReduce, Spark, Hive, as well as DistCp. + +This feature is only available if the container was created with "namespace" +support. + +You enable namespace support when creating a new Storage Account, +by checking the "Hierarchical Namespace" option in the Portal UI, or, when +creating through the command line, using the option `--hierarchical-namespace true` + +_You cannot enable Hierarchical Namespaces on an existing storage account_ + +Containers in a storage account with Hierarchical Namespaces are +not (currently) readable through the `wasb:` connector. + +Some of the `az storage` command line commands fail too, for example: + +```bash +$ az storage container list --account-name abfswales1 +Blob API is not yet supported for hierarchical namespace accounts. ErrorCode: BlobApiNotYetSupportedForHierarchicalNamespaceAccounts +``` + +### Creating an Azure Storage Account + +The best documentation on getting started with Azure Datalake Gen2 with the +abfs connector is [Using Azure Data Lake Storage Gen2 with Azure HDInsight clusters](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-use-hdi-cluster) + +It includes instructions to create it from [the Azure command line tool](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest), +which can be installed on Windows, MacOS (via Homebrew) and Linux (apt or yum). + +The [az storage](https://docs.microsoft.com/en-us/cli/azure/storage?view=azure-cli-latest) subcommand +handles all storage commands, [`az storage account create`](https://docs.microsoft.com/en-us/cli/azure/storage/account?view=azure-cli-latest#az-storage-account-create) +does the creation. + +Until the ADLS gen2 API support is finalized, you need to add an extension +to the ADLS command. +```bash +az extension add --name storage-preview +``` + +Check that all is well by verifying that the usage command includes `--hierarchical-namespace`: +``` +$ az storage account +usage: az storage account create [-h] [--verbose] [--debug] + [--output {json,jsonc,table,tsv,yaml,none}] + [--query JMESPATH] --resource-group + RESOURCE_GROUP_NAME --name ACCOUNT_NAME + [--sku {Standard_LRS,Standard_GRS,Standard_RAGRS,Standard_ZRS,Premium_LRS,Premium_ZRS}] + [--location LOCATION] + [--kind {Storage,StorageV2,BlobStorage,FileStorage,BlockBlobStorage}] + [--tags [TAGS [TAGS ...]]] + [--custom-domain CUSTOM_DOMAIN] + [--encryption-services {blob,file,table,queue} [{blob,file,table,queue} ...]] + [--access-tier {Hot,Cool}] + [--https-only [{true,false}]] + [--file-aad [{true,false}]] + [--hierarchical-namespace [{true,false}]] + [--bypass {None,Logging,Metrics,AzureServices} [{None,Logging,Metrics,AzureServices} ...]] + [--default-action {Allow,Deny}] + [--assign-identity] + [--subscription _SUBSCRIPTION] +``` + +You can list locations from `az account list-locations`, which lists the +name to refer to in the `--location` argument: +``` +$ az account list-locations -o table + +DisplayName Latitude Longitude Name +------------------- ---------- ----------- ------------------ +East Asia 22.267 114.188 eastasia +Southeast Asia 1.283 103.833 southeastasia +Central US 41.5908 -93.6208 centralus +East US 37.3719 -79.8164 eastus +East US 2 36.6681 -78.3889 eastus2 +West US 37.783 -122.417 westus +North Central US 41.8819 -87.6278 northcentralus +South Central US 29.4167 -98.5 southcentralus +North Europe 53.3478 -6.2597 northeurope +West Europe 52.3667 4.9 westeurope +Japan West 34.6939 135.5022 japanwest +Japan East 35.68 139.77 japaneast +Brazil South -23.55 -46.633 brazilsouth +Australia East -33.86 151.2094 australiaeast +Australia Southeast -37.8136 144.9631 australiasoutheast +South India 12.9822 80.1636 southindia +Central India 18.5822 73.9197 centralindia +West India 19.088 72.868 westindia +Canada Central 43.653 -79.383 canadacentral +Canada East 46.817 -71.217 canadaeast +UK South 50.941 -0.799 uksouth +UK West 53.427 -3.084 ukwest +West Central US 40.890 -110.234 westcentralus +West US 2 47.233 -119.852 westus2 +Korea Central 37.5665 126.9780 koreacentral +Korea South 35.1796 129.0756 koreasouth +France Central 46.3772 2.3730 francecentral +France South 43.8345 2.1972 francesouth +Australia Central -35.3075 149.1244 australiacentral +Australia Central 2 -35.3075 149.1244 australiacentral2 +``` + +Once a location has been chosen, create the account +```bash + +az storage account create --verbose \ + --name abfswales1 \ + --resource-group devteam2 \ + --kind StorageV2 \ + --hierarchical-namespace true \ + --location ukwest \ + --sku Standard_LRS \ + --https-only true \ + --encryption-services blob \ + --access-tier Hot \ + --tags owner=engineering \ + --assign-identity \ + --output jsonc +``` + +The output of the command is a JSON file, whose `primaryEndpoints` command +includes the name of the store endpoint: +```json +{ + "primaryEndpoints": { + "blob": "https://abfswales1.blob.core.windows.net/", + "dfs": "https://abfswales1.dfs.core.windows.net/", + "file": "https://abfswales1.file.core.windows.net/", + "queue": "https://abfswales1.queue.core.windows.net/", + "table": "https://abfswales1.table.core.windows.net/", + "web": "https://abfswales1.z35.web.core.windows.net/" + } +} +``` + +The `abfswales1.dfs.core.windows.net` account is the name by which the +storage account will be referred to. + +Now ask for the connection string to the store, which contains the account key +```bash +az storage account show-connection-string --name abfswales1 +{ + "connectionString": "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=abfswales1;AccountKey=ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA==" +} +``` + +You then need to add the access key to your `core-site.xml`, JCEKs file or +use your cluster management tool to set it the option `fs.azure.account.key.STORAGE-ACCOUNT` +to this value. +```XML + + fs.azure.account.key.abfswales1.dfs.core.windows.net + ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA== + +``` + +#### Creation through the Azure Portal + +Creation through the portal is covered in [Quickstart: Create an Azure Data Lake Storage Gen2 storage account](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-quickstart-create-account) + +Key Steps + +1. Create a new Storage Account in a location which suits you. +1. "Basics" Tab: select "StorageV2". +1. "Advanced" Tab: enable "Hierarchical Namespace". + +You have now created your storage account. Next, get the key for authentication +for using the default "Shared Key" authentication. + +1. Go to the Azure Portal. +1. Select "Storage Accounts" +1. Select the newly created storage account. +1. In the list of settings, locate "Access Keys" and select that. +1. Copy one of the access keys to the clipboard, add to the XML option, +set in cluster management tools, Hadoop JCEKS file or KMS store. + +### Creating a new container + +An Azure storage account can have multiple containers, each with the container +name as the userinfo field of the URI used to reference it. + +For example, the container "container1" in the storage account just created +will have the URL `abfs://container1@abfswales1.dfs.core.windows.net/` + + +You can create a new container through the ABFS connector, by setting the option + `fs.azure.createRemoteFileSystemDuringInitialization` to `true`. + +If the container does not exist, an attempt to list it with `hadoop fs -ls` +will fail + +``` +$ hadoop fs -ls abfs://container1@abfswales1.dfs.core.windows.net/ + +ls: `abfs://container1@abfswales1.dfs.core.windows.net/': No such file or directory +``` + +Enable remote FS creation and the second attempt succeeds, creating the container as it does so: + +``` +$ hadoop fs -D fs.azure.createRemoteFileSystemDuringInitialization=true \ + -ls abfs://container1@abfswales1.dfs.core.windows.net/ +``` + +This is useful for creating accounts on the command line, especially before +the `az storage` command supports hierarchical namespaces completely. + + +### Listing and examining containers of a Storage Account. + +You can use the [Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/) + +## Configuring ABFS + +Any configuration can be specified generally (or as the default when accessing all accounts) +or can be tied to a specific account. +For example, an OAuth identity can be configured for use regardless of which +account is accessed with the property `fs.azure.account.oauth2.client.id` +or you can configure an identity to be used only for a specific storage account with +`fs.azure.account.oauth2.client.id..dfs.core.windows.net`. + +This is shown in the Authentication section. + +## Authentication + +Authentication for ABFS is ultimately granted by [Azure Active Directory](https://docs.microsoft.com/en-us/azure/active-directory/develop/authentication-scenarios). + +The concepts covered there are beyond the scope of this document to cover; +developers are expected to have read and understood the concepts therein +to take advantage of the different authentication mechanisms. + +What is covered here, briefly, is how to configure the ABFS client to authenticate +in different deployment situations. + +The ABFS client can be deployed in different ways, with its authentication needs +driven by them. + +1. With the storage account's authentication secret in the configuration: +"Shared Key". +1. Using OAuth 2.0 tokens of one form or another. +1. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application, + "Managed Instance". + +What can be changed is what secrets/credentials are used to authenticate the caller. + +The authentication mechanism is set in `fs.azure.account.auth.type` (or the account specific variant), +and, for the various OAuth options `fs.azure.account.oauth.provider.type` + +All secrets can be stored in JCEKS files. These are encrypted and password +protected —use them or a compatible Hadoop Key Management Store wherever +possible + +### Default: Shared Key + +This is the simplest authentication mechanism of account + password. + +The account name is inferred from the URL; +the password, "key", retrieved from the XML/JCECKs configuration files. + +```xml + + fs.azure.account.auth.type.abfswales1.dfs.core.windows.net + SharedKey + + + + + fs.azure.account.key.abfswales1.dfs.core.windows.net + ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA== + + The secret password. Never share these. + + +``` + +*Note*: The source of the account key can be changed through a custom key provider; +one exists to execute a shell script to retrieve it. + +### OAuth 2.0 Client Credentials + +OAuth 2.0 credentials of (client id, client secret, endpoint) are provided in the configuration/JCEKS file. + +The specifics of this process is covered +in [hadoop-azure-datalake](../hadoop-azure-datalake/index.html#Configuring_Credentials_and_FileSystem); +the key names are slightly different here. + +```xml + + fs.azure.account.auth.type + OAuth + + Use OAuth authentication + + + + fs.azure.account.oauth.provider.type + org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider + + Use client credentials + + + + fs.azure.account.oauth2.client.endpoint + + + URL of OAuth endpoint + + + + fs.azure.account.oauth2.client.id + + + Client ID + + + + fs.azure.account.oauth2.client.secret + + + Secret + + +``` + +### OAuth 2.0: Username and Password + +An OAuth 2.0 endpoint, username and password are provided in the configuration/JCEKS file. + +```xml + + fs.azure.account.auth.type + OAuth + + Use OAuth authentication + + + + fs.azure.account.oauth.provider.type + org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider + + Use user and password + + + + fs.azure.account.oauth2.client.endpoint + + + URL of OAuth 2.0 endpoint + + + + fs.azure.account.oauth2.user.name + + + username + + + + fs.azure.account.oauth2.user.password + + + password for account + + +``` + +### OAuth 2.0: Refresh Token + +With an existing Oauth 2.0 token, make a request of the Active Directory endpoint +`https://login.microsoftonline.com/Common/oauth2/token` for this token to be refreshed. + +```xml + + fs.azure.account.auth.type + OAuth + + Use OAuth 2.0 authentication + + + + fs.azure.account.oauth.provider.type + org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider + + Use the Refresh Token Provider + + + + fs.azure.account.oauth2.refresh.token + + + Refresh token + + + + fs.azure.account.oauth2.client.id + + + Optional Client ID + + +``` + +### Azure Managed Identity + +[Azure Managed Identities](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview), formerly "Managed Service Identities". + +OAuth 2.0 tokens are issued by a special endpoint only accessible +from the executing VM (`http://169.254.169.254/metadata/identity/oauth2/token`). +The issued credentials can be used to authenticate. + +The Azure Portal/CLI is used to create the service identity. + +```xml + + fs.azure.account.auth.type + OAuth + + Use OAuth authentication + + + + fs.azure.account.oauth.provider.type + org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider + + Use MSI for issuing OAuth tokens + + + + fs.azure.account.oauth2.msi.tenant + + + Optional MSI Tenant ID + + + + fs.azure.account.oauth2.client.id + + + Optional Client ID + + +``` + +### Custom OAuth 2.0 Token Provider + +A Custom OAuth 2.0 token provider supplies the ABFS connector with an OAuth 2.0 +token when its `getAccessToken()` method is invoked. + +```xml + + fs.azure.account.auth.type + Custom + + Custom Authentication + + + + fs.azure.account.oauth.provider.type + + + classname of Custom Authentication Provider + + +``` + +The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee` +and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`. + +## Technical notes + +### Proxy setup + +The connector uses the JVM proxy settings to control its proxy setup. + +See The [Oracle Java documentation](https://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html) for the options to set. + +As the connector uses HTTPS by default, the `https.proxyHost` and `https.proxyPort` +options are those which must be configured. + +In MapReduce jobs, including distcp, the proxy options must be set in both the +`mapreduce.map.java.opts` and `mapreduce.reduce.java.opts`. + +```bash +# this variable is only here to avoid typing the same values twice. +# It's name is not important. +export DISTCP_PROXY_OPTS="-Dhttps.proxyHost=web-proxy.example.com -Dhttps.proxyPort=80" + +hadoop distcp \ + -D mapreduce.map.java.opts="$DISTCP_PROXY_OPTS" \ + -D mapreduce.reduce.java.opts="$DISTCP_PROXY_OPTS" \ + -update -skipcrccheck -numListstatusThreads 40 \ + hdfs://namenode:8020/users/alice abfs://backups@account.dfs.core.windows.net/users/alice +``` + +Without these settings, even though access to ADLS may work from the command line, +`distcp` access can fail with network errors. + +### Security + +As with other object stores, login secrets are valuable pieces of information. +Organizations should have a process for safely sharing them. + +### Limitations of the ABFS connector + +* File last access time is not tracked. +* Extended attributes are not supported. +* File Checksums are not supported. +* The `Syncable` interfaces `hsync()` and `hflush()` operations are supported if +`fs.azure.enable.flush` is set to true (default=true). With the Wasb connector, +this limited the number of times either call could be made to 50,000 +[HADOOP-15478](https://issues.apache.org/jira/browse/HADOOP-15478). +If abfs has the a similar limit, then excessive use of sync/flush may +cause problems. + +### Consistency and Concurrency + +As with all Azure storage services, the Azure Datalake Gen 2 store offers +a fully consistent view of the store, with complete +Create, Read, Update, and Delete consistency for data and metadata. +(Compare and contrast with S3 which only offers Create consistency; +S3Guard adds CRUD to metadata, but not the underlying data). + +### Performance and Scalability + +For containers with hierarchical namespaces, +the scalability numbers are, in Big-O-notation, as follows: + +| Operation | Scalability | +|-----------|-------------| +| File Rename | `O(1)` | +| File Delete | `O(1)` | +| Directory Rename:| `O(1)` | +| Directory Delete | `O(1)` | + +For non-namespace stores, the scalability becomes: + +| Operation | Scalability | +|-----------|-------------| +| File Rename | `O(1)` | +| File Delete | `O(1)` | +| Directory Rename:| `O(files)` | +| Directory Delete | `O(files)` | + +That is: the more files there are, the slower directory operations get. + + +Further reading: [Azure Storage Scalability Targets](https://docs.microsoft.com/en-us/azure/storage/common/storage-scalability-targets?toc=%2fazure%2fstorage%2fqueues%2ftoc.json) + +### Extensibility + +The ABFS connector supports a number of limited-private/unstable extension +points for third-parties to integrate their authentication and authorization +services into the ABFS client. + +* `CustomDelegationTokenManager` : adds ability to issue Hadoop Delegation Tokens. +* `AbfsAuthorizer` permits client-side authorization of file operations. +* `CustomTokenProviderAdaptee`: allows for custom provision of +Azure OAuth tokens. +* `KeyProvider`. + +Consult the source in `org.apache.hadoop.fs.azurebfs.extensions` +and all associated tests to see how to make use of these extension points. + +_Warning_ These extension points are unstable. + +## Other configuration options + +Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys`, +`org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations` and +`org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list +of configuration options and their default values. + + +## Troubleshooting + +The problems associated with the connector usually come down to, in order + +1. Classpath. +1. Network setup (proxy etc.). +1. Authentication and Authorization. +1. Anything else. + +If you log `org.apache.hadoop.fs.azurebfs.services` at `DEBUG` then you will +see more details about any request which is failing. + +One useful tool for debugging connectivity is the [cloudstore storediag utility](https://github.com/steveloughran/cloudstore/releases). + +This validates the classpath, the settings, then tries to work with the filesystem. + +```bash +bin/hadoop jar cloudstore-0.1-SNAPSHOT.jar storediag abfs://container@account.dfs.core.windows.net/ +``` + +1. If the `storediag` command cannot work with an abfs store, nothing else is likely to. +1. If the `storediag` store does successfully work, that does not guarantee that the classpath +or configuration on the rest of the cluster is also going to work, especially +in distributed applications. But it is at least a start. + +### `ClassNotFoundException: org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem` + +The `hadoop-azure` JAR is not on the classpah. + +``` +java.lang.RuntimeException: java.lang.ClassNotFoundException: + Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found + at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2625) + at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3290) + at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3322) + at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:136) + at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3373) + at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3341) + at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:491) + at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361) +Caused by: java.lang.ClassNotFoundException: + Class org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem not found + at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2529) + at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2623) + ... 16 more +``` + +Tip: if this is happening on the command line, you can turn on debug logging +of the hadoop scripts: + +```bash +export HADOOP_SHELL_SCRIPT_DEBUG=true +``` + +If this is happening on an application running within the cluster, it means +the cluster (somehow) needs to be configured so that the `hadoop-azure` +module and dependencies are on the classpath of deployed applications. + +### `ClassNotFoundException: com.microsoft.azure.storage.StorageErrorCode` + +The `azure-storage` JAR is not on the classpath. + +### `Server failed to authenticate the request` + +The request wasn't authenticated while using the default shared-key +authentication mechanism. + +``` +Operation failed: "Server failed to authenticate the request. + Make sure the value of Authorization header is formed correctly including the signature.", + 403, HEAD, https://account.dfs.core.windows.net/container2?resource=filesystem&timeout=90 + at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:135) + at org.apache.hadoop.fs.azurebfs.services.AbfsClient.getFilesystemProperties(AbfsClient.java:209) + at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getFilesystemProperties(AzureBlobFileSystemStore.java:259) + at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.fileSystemExists(AzureBlobFileSystem.java:859) + at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110) +``` + +Causes include: + +* Your credentials are incorrect. +* Your shared secret has expired. in Azure, this happens automatically +* Your shared secret has been revoked. +* host/VM clock drift means that your client's clock is out of sync with the +Azure servers —the call is being rejected as it is either out of date (considered a replay) +or from the future. Fix: Check your clocks, etc. + +### `Configuration property _something_.dfs.core.windows.net not found` + +There's no `fs.azure.account.key.` entry in your cluster configuration declaring the +access key for the specific account, or you are using the wrong URL + +``` +$ hadoop fs -ls abfs://container@abfswales2.dfs.core.windows.net/ + +ls: Configuration property abfswales2.dfs.core.windows.net not found. +``` + +* Make sure that the URL is correct +* Add the missing account key. + + +### `No such file or directory when trying to list a container` + +There is no container of the given name. Either it has been mistyped +or the container needs to be created. + +``` +$ hadoop fs -ls abfs://container@abfswales1.dfs.core.windows.net/ + +ls: `abfs://container@abfswales1.dfs.core.windows.net/': No such file or directory +``` + +* Make sure that the URL is correct +* Create the container if needed + +### "HTTP connection to https://login.microsoftonline.com/_something_ failed for getting token from AzureAD. Http response: 200 OK" + ++ it has a content-type `text/html`, `text/plain`, `application/xml` + +The OAuth authentication page didn't fail with an HTTP error code, but it didn't return JSON either + +``` +$ bin/hadoop fs -ls abfs://container@abfswales1.dfs.core.windows.net/ + + ... + +ls: HTTP Error 200; + url='https://login.microsoftonline.com/02a07549-0a5f-4c91-9d76-53d172a638a2/oauth2/authorize' + AADToken: HTTP connection to + https://login.microsoftonline.com/02a07549-0a5f-4c91-9d76-53d172a638a2/oauth2/authorize + failed for getting token from AzureAD. + Unexpected response. + Check configuration, URLs and proxy settings. + proxies=none; + requestId='dd9d526c-8b3d-4b3f-a193-0cf021938600'; + contentType='text/html; charset=utf-8'; +``` + +Likely causes are configuration and networking: + +1. Authentication is failing, the caller is being served up the Azure Active Directory +signon page for humans, even though it is a machine calling. +1. The URL is wrong —it is pointing at a web page unrelated to OAuth2.0 +1. There's a proxy server in the way trying to return helpful instructions. + +## Testing ABFS + +See the relevant section in [Testing Azure](testing_azure.html). diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 3f29a6e0af..11d0a18b55 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -16,17 +16,26 @@ +See also: + +* [ABFS](./abfs.html) +* [Testing](./testing_azure.html) + ## Introduction -The hadoop-azure module provides support for integration with +The `hadoop-azure` module provides support for integration with [Azure Blob Storage](http://azure.microsoft.com/en-us/documentation/services/storage/). -The built jar file, named hadoop-azure.jar, also declares transitive dependencies +The built jar file, named `hadoop-azure.jar`, also declares transitive dependencies on the additional artifacts it requires, notably the [Azure Storage SDK for Java](https://github.com/Azure/azure-storage-java). To make it part of Apache Hadoop's default classpath, simply make sure that -HADOOP_OPTIONAL_TOOLS in hadoop-env.sh has 'hadoop-azure' in the list. +`HADOOP_OPTIONAL_TOOLS`in `hadoop-env.sh` has `'hadoop-azure` in the list. +Example: +```bash +export HADOOP_OPTIONAL_TOOLS="hadoop-azure,hadoop-azure-datalake" +``` ## Features * Read and write data stored in an Azure Blob Storage account. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index cb9549d81d..fc2258997b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; @@ -342,4 +343,13 @@ protected Path path(String filepath) throws IOException { new Path(getTestPath(), filepath)); } + /** + * Get any Delegation Token manager created by the filesystem. + * @return the DT manager or null. + * @throws IOException failure + */ + protected AbfsDelegationTokenManager getDelegationTokenManager() + throws IOException { + return getFileSystem().getDelegationTokenManager(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java index 424361b247..518859c302 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsIdentityTransformer.java @@ -62,6 +62,7 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{ public ITestAbfsIdentityTransformer() throws Exception { super(); + UserGroupInformation.reset(); userGroupInfo = UserGroupInformation.getCurrentUser(); localUser = userGroupInfo.getShortUserName(); localGroup = userGroupInfo.getPrimaryGroupName(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ClassicDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ClassicDelegationTokenManager.java new file mode 100644 index 0000000000..f87fc654f0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ClassicDelegationTokenManager.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.Charset; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_DELEGATION_TOKEN; +import static org.apache.hadoop.fs.azurebfs.extensions.KerberizedAbfsCluster.newURI; +import static org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier.decodeIdentifier; + +/** + * This is a Stub DT manager for testing, one which + * implements the the {@link CustomDelegationTokenManager} API, but + * not the extended one. + * + * Member variables are updated as operations are performed, so + * test cases can make assertions about the state of the plugin. + */ +public class ClassicDelegationTokenManager + implements CustomDelegationTokenManager { + + private static final Logger LOG = LoggerFactory.getLogger( + ClassicDelegationTokenManager.class); + + /** + * Classname. + */ + public static final String NAME + = "org.apache.hadoop.fs.azurebfs.extensions.ClassicDelegationTokenManager"; + + /** + * If this the DT is unbound, this is used for the service kind. + */ + public static final String UNSET = "abfs://user@unset.dfs.core.windows.net/"; + + /** + * The URI used when creating a token for an unset binding. + */ + public static final URI UNSET_URI = newURI(UNSET); + + private URI fsURI; + + private boolean initialized; + + private boolean closed; + + private int renewals; + + private int cancellations; + + private int issued; + + private Text kind; + + private UserGroupInformation owner; + + private String canonicalServiceName; + + /** + * Instantiate. + */ + public ClassicDelegationTokenManager() { + } + + @Override + public void initialize(final Configuration configuration) throws IOException { + initialized = true; + owner = UserGroupInformation.getCurrentUser(); + LOG.info("Creating Stub DT manager for {}", owner.getUserName()); + } + + public void close() { + closed = true; + } + + @Override + public Token getDelegationToken(final String renewer) + throws IOException { + // guarantees issued + issued++; + + URI uri = fsURI != null ? fsURI : UNSET_URI; + Text renewerT = new Text(renewer != null ? renewer : ""); + Token t = createToken(issued, uri, new Text(owner.getUserName()), + renewerT); + if (kind != null) { + t.setKind(kind); + } + t.setService(createServiceText()); + LOG.info("Created token {}", t); + return t; + } + + public Text createServiceText() { + return new Text(fsURI != null ? fsURI.toString() : UNSET); + } + + /** + * Create a token. + * + * @param sequenceNumber sequence number. + * @param uri FS URI + * @param owner FS owner + * @param renewer renewer + * @return a token. + */ + public static Token createToken( + final int sequenceNumber, + final URI uri, + final Text owner, + final Text renewer) { + StubAbfsTokenIdentifier id + = new StubAbfsTokenIdentifier(uri, owner, renewer); + id.setSequenceNumber(sequenceNumber); + Token token = new Token( + id, + new TokenSecretManager()); + + return token; + } + + @Override + public long renewDelegationToken(final Token token) throws IOException { + renewals++; + decodeIdentifier(token); + return 0; + } + + @Override + public void cancelDelegationToken(final Token token) throws IOException { + cancellations++; + decodeIdentifier(token); + } + + protected void innerBind(final URI uri, final Configuration conf) + throws IOException { + Preconditions.checkState(initialized, "Not initialized"); + Preconditions.checkState(fsURI == null, "already bound"); + fsURI = uri; + canonicalServiceName = uri.toString(); + LOG.info("Bound to {}", fsURI); + } + + public String getCanonicalServiceName() { + return canonicalServiceName; + } + + public void setCanonicalServiceName(final String canonicalServiceName) { + this.canonicalServiceName = canonicalServiceName; + } + + public URI getFsURI() { + return fsURI; + } + + public boolean isInitialized() { + return initialized; + } + + public boolean isBound() { + return fsURI != null; + } + + public boolean isClosed() { + return closed; + } + + public int getRenewals() { + return renewals; + } + + public int getCancellations() { + return cancellations; + } + + public int getIssued() { + return issued; + } + + public Text getKind() { + return kind; + } + + public void setKind(final Text kind) { + this.kind = kind; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "StubDelegationTokenManager{"); + sb.append("fsURI=").append(fsURI); + sb.append(", initialized=").append(initialized); + sb.append(", closed=").append(closed); + sb.append(", renewals=").append(renewals); + sb.append(", cancellations=").append(cancellations); + sb.append(", issued=").append(issued); + sb.append('}'); + return sb.toString(); + } + + /** + * Patch a configuration to declare this the DT provider for a filesystem + * built off the given configuration. + * The ABFS Filesystem still needs to come up with security enabled. + * @param conf configuration. + * @return the patched configuration. + */ + public static Configuration useClassicDTManager(Configuration conf) { + conf.setBoolean(FS_AZURE_ENABLE_DELEGATION_TOKEN, true); + conf.set(FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE, + ClassicDelegationTokenManager.NAME); + return conf; + } + + /** + * Get the password to use in secret managers. + * This is a constant; its just recalculated every time to stop findbugs + * highlighting security risks of shared mutable byte arrays. + * @return a password. + */ + private static byte[] getSecretManagerPasssword() { + return "non-password".getBytes(Charset.forName("UTF-8")); + } + + /** + * The secret manager always uses the same secret; the + * factory for new identifiers is that of the token manager. + */ + protected static class TokenSecretManager + extends SecretManager { + + public TokenSecretManager() { + } + + @Override + protected byte[] createPassword(StubAbfsTokenIdentifier identifier) { + return getSecretManagerPasssword(); + } + + @Override + public byte[] retrievePassword(StubAbfsTokenIdentifier identifier) + throws InvalidToken { + return getSecretManagerPasssword(); + } + + @Override + public StubAbfsTokenIdentifier createIdentifier() { + return new StubAbfsTokenIdentifier(); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ITestAbfsDelegationTokens.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ITestAbfsDelegationTokens.java new file mode 100644 index 0000000000..d2c852a70b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/ITestAbfsDelegationTokens.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.DtUtilShell; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP; +import static org.apache.hadoop.test.LambdaTestUtils.doAs; + +/** + * Test custom DT support in ABFS. + * This brings up a mini KDC in class setup/teardown, as the FS checks + * for that when it enables security. + * + * Much of this code is copied from + * {@code org.apache.hadoop.fs.s3a.auth.delegation.AbstractDelegationIT} + */ +public class ITestAbfsDelegationTokens extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = LoggerFactory.getLogger( + ITestAbfsDelegationTokens.class); + + /** + * Created in static {@link #setupCluster()} call. + */ + @SuppressWarnings("StaticNonFinalField") + private static KerberizedAbfsCluster cluster; + + private UserGroupInformation aliceUser; + + /*** + * Set up the clusters. + */ + @BeforeClass + public static void setupCluster() throws Exception { + resetUGI(); + cluster = new KerberizedAbfsCluster(); + cluster.init(new Configuration()); + cluster.start(); + } + + /** + * Tear down the Cluster. + */ + @SuppressWarnings("ThrowableNotThrown") + @AfterClass + public static void teardownCluster() throws Exception { + resetUGI(); + ServiceOperations.stopQuietly(LOG, cluster); + } + + public ITestAbfsDelegationTokens() throws Exception { + } + + @Override + public void setup() throws Exception { + // create the FS + Configuration conf = getRawConfiguration(); + cluster.bindConfToCluster(conf); + conf.setBoolean(HADOOP_SECURITY_TOKEN_SERVICE_USE_IP, + false); + resetUGI(); + UserGroupInformation.setConfiguration(conf); + aliceUser = cluster.createAliceUser(); + + assertSecurityEnabled(); + // log in as alice so that filesystems belong to that user + UserGroupInformation.setLoginUser(aliceUser); + StubDelegationTokenManager.useStubDTManager(conf); + FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser()); + super.setup(); + assertNotNull("No StubDelegationTokenManager created in filesystem init", + getStubDTManager()); + } + + protected StubDelegationTokenManager getStubDTManager() throws IOException { + return (StubDelegationTokenManager) getDelegationTokenManager().getTokenManager(); + } + + /** + * Cleanup removes cached filesystems and the last instance of the + * StubDT manager. + */ + @Override + public void teardown() throws Exception { + // clean up all of alice's instances. + FileSystem.closeAllForUGI(UserGroupInformation.getLoginUser()); + super.teardown(); + } + + /** + * General assertion that security is turred on for a cluster. + */ + public static void assertSecurityEnabled() { + assertTrue("Security is needed for this test", + UserGroupInformation.isSecurityEnabled()); + } + + /** + * Reset UGI info. + */ + protected static void resetUGI() { + UserGroupInformation.reset(); + } + + /** + * Create credentials with the DTs of the given FS. + * @param fs filesystem + * @return a non-empty set of credentials. + * @throws IOException failure to create. + */ + protected static Credentials mkTokens(final FileSystem fs) + throws IOException { + Credentials cred = new Credentials(); + fs.addDelegationTokens("rm/rm1@EXAMPLE.COM", cred); + return cred; + } + + @Test + public void testTokenManagerBinding() throws Throwable { + StubDelegationTokenManager instance + = getStubDTManager(); + assertNotNull("No StubDelegationTokenManager created in filesystem init", + instance); + assertTrue("token manager not initialized: " + instance, + instance.isInitialized()); + } + + /** + * When bound to a custom DT manager, it provides the service name. + * The stub returns the URI by default. + */ + @Test + public void testCanonicalization() throws Throwable { + String service = getCanonicalServiceName(); + assertNotNull("No canonical service name from filesystem " + getFileSystem(), + service); + assertEquals("canonical URI and service name mismatch", + getFilesystemURI(), new URI(service)); + } + + protected URI getFilesystemURI() throws IOException { + return getFileSystem().getUri(); + } + + protected String getCanonicalServiceName() throws IOException { + return getFileSystem().getCanonicalServiceName(); + } + + /** + * Checks here to catch any regressions in canonicalization + * logic. + */ + @Test + public void testDefaultCanonicalization() throws Throwable { + FileSystem fs = getFileSystem(); + clearTokenServiceName(); + + assertEquals("canonicalServiceName is not the default", + getDefaultServiceName(fs), getCanonicalServiceName()); + } + + protected String getDefaultServiceName(final FileSystem fs) { + return SecurityUtil.buildDTServiceName(fs.getUri(), 0); + } + + protected void clearTokenServiceName() throws IOException { + getStubDTManager().setCanonicalServiceName(null); + } + + /** + * Request a token; this tests the collection workflow. + */ + @Test + public void testRequestToken() throws Throwable { + AzureBlobFileSystem fs = getFileSystem(); + Credentials credentials = mkTokens(fs); + assertEquals("Number of collected tokens", 1, + credentials.numberOfTokens()); + verifyCredentialsContainsToken(credentials, fs); + } + + /** + * Request a token; this tests the collection workflow. + */ + @Test + public void testRequestTokenDefault() throws Throwable { + clearTokenServiceName(); + + AzureBlobFileSystem fs = getFileSystem(); + assertEquals("canonicalServiceName is not the default", + getDefaultServiceName(fs), fs.getCanonicalServiceName()); + + Credentials credentials = mkTokens(fs); + assertEquals("Number of collected tokens", 1, + credentials.numberOfTokens()); + verifyCredentialsContainsToken(credentials, + getDefaultServiceName(fs), getFilesystemURI().toString()); + } + + public void verifyCredentialsContainsToken(final Credentials credentials, + FileSystem fs) throws IOException { + verifyCredentialsContainsToken(credentials, + fs.getCanonicalServiceName(), + fs.getUri().toString()); + } + + /** + * Verify that the set of credentials contains a token for the given + * canonical service name, and that it is of the given kind. + * @param credentials set of credentials + * @param serviceName canonical service name for lookup. + * @param tokenService service kind; also expected in string value. + * @return the retrieved token. + * @throws IOException IO failure + */ + public StubAbfsTokenIdentifier verifyCredentialsContainsToken( + final Credentials credentials, + final String serviceName, + final String tokenService) throws IOException { + Token token = credentials.getToken( + new Text(serviceName)); + + assertEquals("Token Kind in " + token, + StubAbfsTokenIdentifier.TOKEN_KIND, token.getKind()); + assertEquals("Token Service Kind in " + token, + tokenService, token.getService().toString()); + + StubAbfsTokenIdentifier abfsId = (StubAbfsTokenIdentifier) + token.decodeIdentifier(); + LOG.info("Created token {}", abfsId); + assertEquals("token URI in " + abfsId, + tokenService, abfsId.getUri().toString()); + return abfsId; + } + + /** + * This mimics the DT collection performed inside FileInputFormat to + * collect DTs for a job. + * @throws Throwable on failure. + */ + @Test + public void testJobsCollectTokens() throws Throwable { + // get tokens for all the required FileSystems.. + AzureBlobFileSystem fs = getFileSystem(); + Credentials credentials = new Credentials(); + Path root = fs.makeQualified(new Path("/")); + Path[] paths = {root}; + + Configuration conf = fs.getConf(); + TokenCache.obtainTokensForNamenodes(credentials, + paths, + conf); + verifyCredentialsContainsToken(credentials, fs); + } + + /** + * Run the DT Util command. + * @param expected expected outcome + * @param conf configuration for the command (hence: FS to create) + * @param args other arguments + * @return the output of the command. + */ + protected String dtutil(final int expected, + final Configuration conf, + final String... args) throws Exception { + final ByteArrayOutputStream dtUtilContent = new ByteArrayOutputStream(); + DtUtilShell dt = new DtUtilShell(); + dt.setOut(new PrintStream(dtUtilContent)); + dtUtilContent.reset(); + int r = doAs(aliceUser, + () -> ToolRunner.run(conf, dt, args)); + String s = dtUtilContent.toString(); + LOG.info("\n{}", s); + assertEquals("Exit code from command dtutil " + + StringUtils.join(" ", args) + " with output " + s, + expected, r); + return s; + } + + /** + * Verify the dtutil shell command can fetch tokens + */ + @Test + public void testDTUtilShell() throws Throwable { + File tokenfile = cluster.createTempTokenFile(); + + String tfs = tokenfile.toString(); + String fsURI = getFileSystem().getUri().toString(); + dtutil(0, getRawConfiguration(), + "get", fsURI, + "-format", "protobuf", + tfs); + assertTrue("not created: " + tokenfile, + tokenfile.exists()); + assertTrue("File is empty " + tokenfile, + tokenfile.length() > 0); + assertTrue("File only contains header " + tokenfile, + tokenfile.length() > 6); + + String printed = dtutil(0, getRawConfiguration(), "print", tfs); + assertTrue("no " + fsURI + " in " + printed, + printed.contains(fsURI)); + assertTrue("no " + StubAbfsTokenIdentifier.ID + " in " + printed, + printed.contains(StubAbfsTokenIdentifier.ID)); + } + + /** + * Creates a new FS instance with the simplest binding lifecycle; + * get a token. + * This verifies the classic binding mechanism works. + */ + @Test + public void testBaseDTLifecycle() throws Throwable { + + Configuration conf = new Configuration(getRawConfiguration()); + ClassicDelegationTokenManager.useClassicDTManager(conf); + try (FileSystem fs = FileSystem.newInstance(getFilesystemURI(), conf)) { + Credentials credentials = mkTokens(fs); + assertEquals("Number of collected tokens", 1, + credentials.numberOfTokens()); + verifyCredentialsContainsToken(credentials, + fs.getCanonicalServiceName(), + ClassicDelegationTokenManager.UNSET); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/KerberizedAbfsCluster.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/KerberizedAbfsCluster.java new file mode 100644 index 0000000000..35444f8e44 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/KerberizedAbfsCluster.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.KDiag; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.ssl.KeyStoreTestUtil; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytabAndReturnUGI; +import static org.junit.Assert.assertTrue; + +/** + * composite service for adding kerberos login for ABFS + * tests which require a logged in user. + * Based on + * {@code org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster} + */ +public class KerberizedAbfsCluster extends CompositeService { + + private static final Logger LOG = + LoggerFactory.getLogger(KerberizedAbfsCluster.class); + + public static final String ALICE = "alice"; + + public static final String BOB = "bob"; + + public static final String HTTP_LOCALHOST = "HTTP/localhost@$LOCALHOST"; + + /** + * The hostname is dynamically determined based on OS, either + * "localhost" (non-windows) or 127.0.0.1 (windows). + */ + public static final String LOCALHOST_NAME = Path.WINDOWS + ? "127.0.0.1" + : "localhost"; + + private MiniKdc kdc; + + private File keytab; + + private File workDir; + + private String krbInstance; + + private String loginUsername; + + private String loginPrincipal; + + private String sslConfDir; + + private String clientSSLConfigFileName; + + private String serverSSLConfigFileName; + + private String alicePrincipal; + + private String bobPrincipal; + + /** + * Create the cluster. + * If this class's log is at DEBUG level, this also turns + * Kerberos diagnostics on in the JVM. + */ + public KerberizedAbfsCluster() { + super("KerberizedAbfsCluster"); + // load all the configs to force in the -default.xml files + new JobConf(); + if (LOG.isDebugEnabled()) { + // turn on kerberos logging @ debug. + System.setProperty(KDiag.SUN_SECURITY_KRB5_DEBUG, "true"); + System.setProperty(KDiag.SUN_SECURITY_SPNEGO_DEBUG, "true"); + } + + } + + public MiniKdc getKdc() { + return kdc; + } + + public File getKeytab() { + return keytab; + } + + public String getKeytabPath() { + return keytab.getAbsolutePath(); + } + + public UserGroupInformation createBobUser() throws IOException { + return loginUserFromKeytabAndReturnUGI(bobPrincipal, + keytab.getAbsolutePath()); + } + + public UserGroupInformation createAliceUser() throws IOException { + return loginUserFromKeytabAndReturnUGI(alicePrincipal, + keytab.getAbsolutePath()); + } + + public File getWorkDir() { + return workDir; + } + + public String getKrbInstance() { + return krbInstance; + } + + public String getLoginUsername() { + return loginUsername; + } + + public String getLoginPrincipal() { + return loginPrincipal; + } + + public String withRealm(String user) { + return user + "@EXAMPLE.COM"; + } + + /** + * Service init creates the KDC. + * @param conf configuration + */ + @Override + protected void serviceInit(final Configuration conf) throws Exception { + patchConfigAtInit(conf); + super.serviceInit(conf); + Properties kdcConf = MiniKdc.createConf(); + workDir = GenericTestUtils.getTestDir("kerberos"); + workDir.mkdirs(); + kdc = new MiniKdc(kdcConf, workDir); + + krbInstance = LOCALHOST_NAME; + } + + /** + * Start the KDC, create the keytab and the alice and bob users, + * and UGI instances of them logged in from the keytab. + */ + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + kdc.start(); + keytab = new File(workDir, "keytab.bin"); + loginUsername = UserGroupInformation.getLoginUser().getShortUserName(); + loginPrincipal = loginUsername + "/" + krbInstance; + + alicePrincipal = ALICE + "/" + krbInstance; + bobPrincipal = BOB + "/" + krbInstance; + kdc.createPrincipal(keytab, + alicePrincipal, + bobPrincipal, + "HTTP/" + krbInstance, + HTTP_LOCALHOST, + loginPrincipal); + final File keystoresDir = new File(workDir, "ssl"); + keystoresDir.mkdirs(); + sslConfDir = KeyStoreTestUtil.getClasspathDir( + this.getClass()); + KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), + sslConfDir, getConfig(), false); + clientSSLConfigFileName = KeyStoreTestUtil.getClientSSLConfigFileName(); + serverSSLConfigFileName = KeyStoreTestUtil.getServerSSLConfigFileName(); + String kerberosRule = + "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\nDEFAULT"; + KerberosName.setRules(kerberosRule); + } + + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + // this can throw an exception, but it will get caught by the superclass. + kdc.stop(); + } + + + protected void patchConfigAtInit(final Configuration conf) { + + // turn off some noise during debugging + int timeout = (int) Duration.ofHours(1).toMillis(); + conf.setInt("jvm.pause.info-threshold.ms", timeout); + conf.setInt("jvm.pause.warn-threshold.ms", timeout); + } + + public void resetUGI() { + UserGroupInformation.reset(); + } + + /** + * Given a shortname, built a long name with the krb instance and realm info. + * @param shortname short name of the user + * @return a long name + */ + private String userOnHost(final String shortname) { + return shortname + "/" + krbInstance + "@" + getRealm(); + } + + public String getRealm() { + return kdc.getRealm(); + } + + /** + * Log in a user to UGI.currentUser. + * @param user user to log in from + * @throws IOException failure + */ + public void loginUser(final String user) throws IOException { + UserGroupInformation.loginUserFromKeytab(user, getKeytabPath()); + } + + /** + * Log in the login principal as the current user. + * @throws IOException failure + */ + public void loginPrincipal() throws IOException { + loginUser(getLoginPrincipal()); + } + + /** + * General assertion that security is turred on for a cluster. + */ + public static void assertSecurityEnabled() { + assertTrue("Security is needed for this test", + UserGroupInformation.isSecurityEnabled()); + } + + + /** + * Close filesystems for a user, downgrading a null user to a no-op. + * @param ugi user + * @throws IOException if a close operation raised one. + */ + public static void closeUserFileSystems(UserGroupInformation ugi) + throws IOException { + if (ugi != null) { + FileSystem.closeAllForUGI(ugi); + } + } + + /** + * Modify a configuration to use Kerberos as the auth method. + * @param conf configuration to patch. + */ + public void bindConfToCluster(Configuration conf) { + conf.set(HADOOP_SECURITY_AUTHENTICATION, + UserGroupInformation.AuthenticationMethod.KERBEROS.name()); + conf.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES, + "alice,alice"); + // a shortname for the RM principal avoids kerberos mapping problems. + conf.set(YarnConfiguration.RM_PRINCIPAL, BOB); + } + + /** + * Utility method to create a URI, converting URISyntaxException + * to RuntimeExceptions. This makes it easier to set up URIs + * in static fields. + * @param uri URI to create. + * @return the URI. + * @throws RuntimeException syntax error. + */ + public static URI newURI(String uri) { + try { + return new URI(uri); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + /** + * Create the filename for a temporary token file, in the + * work dir of this cluster. + * @return a filename which does not exist. + * @throws IOException failure + */ + public File createTempTokenFile() throws IOException { + File tokenfile = File.createTempFile("tokens", ".bin", + getWorkDir()); + tokenfile.delete(); + return tokenfile; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubAbfsTokenIdentifier.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubAbfsTokenIdentifier.java new file mode 100644 index 0000000000..4271ba6dfd --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubAbfsTokenIdentifier.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.time.Clock; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import java.util.Objects; +import java.util.UUID; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; + +/** + * Token identifier for testing ABFS DT support; matched with + * a service declaration so it can be unmarshalled. + */ +public class StubAbfsTokenIdentifier extends DelegationTokenIdentifier { + + public static final String ID = "StubAbfsTokenIdentifier"; + + public static final int MAX_TEXT_LENGTH = 512; + + public static final Text TOKEN_KIND = new Text(ID); + + /** Canonical URI of the store. */ + private URI uri; + + /** + * Timestamp of creation. + * This is set to the current time; it will be overridden when + * deserializing data. + */ + private long created = System.currentTimeMillis(); + + /** + * This marshalled UUID can be used in testing to verify transmission, + * and reuse; as it is printed you can see what is happending too. + */ + private String uuid = UUID.randomUUID().toString(); + + + /** + * This is the constructor used for deserialization, so there's + * no need to fill in all values. + */ + public StubAbfsTokenIdentifier() { + super(TOKEN_KIND); + } + + /** + * Create. + * @param uri owner UI + * @param owner token owner + * @param renewer token renewer + */ + public StubAbfsTokenIdentifier( + final URI uri, + final Text owner, + final Text renewer) { + + super(TOKEN_KIND, owner, renewer, new Text()); + this.uri = uri; + Clock clock = Clock.systemDefaultZone(); + + long now = clock.millis(); + Instant nowTime = Instant.ofEpochMilli(now); + setIssueDate(now); + setMaxDate(nowTime.plus(1, ChronoUnit.HOURS).toEpochMilli()); + } + + public static StubAbfsTokenIdentifier decodeIdentifier(final Token token) + throws IOException { + StubAbfsTokenIdentifier id + = (StubAbfsTokenIdentifier) token.decodeIdentifier(); + Preconditions.checkNotNull(id, "Null decoded identifier"); + return id; + } + + public URI getUri() { + return uri; + } + + public long getCreated() { + return created; + } + + + public String getUuid() { + return uuid; + } + + /** + * Write state. + * {@link org.apache.hadoop.io.Writable#write(DataOutput)}. + * @param out destination + * @throws IOException failure + */ + @Override + public void write(final DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, uri.toString()); + Text.writeString(out, uuid); + out.writeLong(created); + } + + /** + * Read state. + * {@link org.apache.hadoop.io.Writable#readFields(DataInput)}. + * + * Note: this operation gets called in toString() operations on tokens, so + * must either always succeed, or throw an IOException to trigger the + * catch & downgrade. RuntimeExceptions (e.g. Preconditions checks) are + * not to be used here for this reason.) + * + * @param in input stream + * @throws IOException IO problems. + */ + @Override + public void readFields(final DataInput in) + throws IOException { + super.readFields(in); + uri = URI.create(Text.readString(in, MAX_TEXT_LENGTH)); + uuid = Text.readString(in, MAX_TEXT_LENGTH); + created = in.readLong(); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbfsIDBTokenIdentifier{"); + sb.append("uri=").append(uri); + sb.append(", uuid='").append(uuid).append('\''); + sb.append(", created='").append(new Date(created)).append('\''); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + final StubAbfsTokenIdentifier that = (StubAbfsTokenIdentifier) o; + return created == that.created + && uri.equals(that.uri) + && uuid.equals(that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), uri, uuid); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubDelegationTokenManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubDelegationTokenManager.java new file mode 100644 index 0000000000..dbd24bfe1e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/StubDelegationTokenManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.IOException; +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_DELEGATION_TOKEN; + +/** + * This is a Stub DT manager which adds support for {@link BoundDTExtension} + * to {@link ClassicDelegationTokenManager}. + */ +public class StubDelegationTokenManager extends ClassicDelegationTokenManager + implements BoundDTExtension { + + private static final Logger LOG = LoggerFactory.getLogger( + StubDelegationTokenManager.class); + + /** + * Classname. + */ + public static final String NAME + = "org.apache.hadoop.fs.azurebfs.extensions.StubDelegationTokenManager"; + + /** + * Instantiate. + */ + public StubDelegationTokenManager() { + } + + @Override + public void bind(final URI uri, final Configuration conf) + throws IOException { + super.innerBind(uri, conf); + } + + /** + * Create a token. + * + * @param sequenceNumber sequence number. + * @param uri FS URI + * @param owner FS owner + * @param renewer renewer + * @return a token. + */ + public static Token createToken( + final int sequenceNumber, + final URI uri, + final Text owner, + final Text renewer) { + return ClassicDelegationTokenManager.createToken(sequenceNumber, uri, owner, + renewer); + } + + /** + * Patch a configuration to declare this the DT provider for a filesystem + * built off the given configuration. + * The ABFS Filesystem still needs to come up with security enabled. + * @param conf configuration. + * @return the patched configuration. + */ + public static Configuration useStubDTManager(Configuration conf) { + conf.setBoolean(FS_AZURE_ENABLE_DELEGATION_TOKEN, true); + conf.set(FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE, + StubDelegationTokenManager.NAME); + return conf; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestCustomOauthTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestCustomOauthTokenProvider.java new file mode 100644 index 0000000000..6d9dc5a98f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestCustomOauthTokenProvider.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.net.URI; +import java.util.Date; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout; +import org.apache.hadoop.fs.azurebfs.oauth2.AzureADToken; +import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter; + +import static org.apache.hadoop.fs.azurebfs.extensions.WrappingTokenProvider.*; + +/** + * Test custom OAuth token providers. + * This is a unit test not an E2E integration test because that would + * require OAuth auth setup, always. + * Instead this just checks that the creation works and that everything + * is propagated. + */ +@SuppressWarnings("UseOfObsoleteDateTimeApi") +public class TestCustomOauthTokenProvider extends AbstractAbfsTestWithTimeout { + + public TestCustomOauthTokenProvider() throws Exception { + } + + /** + * If you switch to a custom provider, it is loaded and initialized. + */ + @Test + public void testCustomProviderBinding() throws Throwable { + Configuration conf = new Configuration(); + WrappingTokenProvider.enable(conf); + AbfsConfiguration abfs = new AbfsConfiguration(conf, + "not-a-real-account"); + CustomTokenProviderAdapter provider = + (CustomTokenProviderAdapter) abfs.getTokenProvider(); + assertEquals("User agent", INITED, provider.getUserAgentSuffix()); + + // now mimic the bind call + ExtensionHelper.bind(provider, + new URI("abfs://store@user.dfs.core.windows.net"), + conf); + assertEquals("User agent", BOUND, + ExtensionHelper.getUserAgentSuffix(provider, "")); + AzureADToken token = provider.getToken(); + assertEquals("Access token propagation", + ACCESS_TOKEN, token.getAccessToken()); + Date expiry = token.getExpiry(); + long time = expiry.getTime(); + assertTrue("date wrong: " + expiry, + time <= System.currentTimeMillis()); + // once closed, the UA state changes. + provider.close(); + assertEquals("User agent", CLOSED, provider.getUserAgentSuffix()); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestDTManagerLifecycle.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestDTManagerLifecycle.java new file mode 100644 index 0000000000..5566a4b535 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/TestDTManagerLifecycle.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.net.URI; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsTestWithTimeout; +import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; + +import static org.apache.hadoop.fs.azurebfs.extensions.KerberizedAbfsCluster.newURI; +import static org.apache.hadoop.fs.azurebfs.extensions.StubDelegationTokenManager.createToken; +import static org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier.decodeIdentifier; + +/** + * Test the lifecycle of custom DT managers. + */ +@SuppressWarnings("IOResourceOpenedButNotSafelyClosed") +public class TestDTManagerLifecycle extends AbstractAbfsTestWithTimeout { + + public static final String RENEWER = "resourcemanager"; + + private Configuration conf; + + public static final String ABFS + = "abfs://testing@account.dfs.core.windows.net"; + + public static final URI FSURI = newURI(ABFS); + + public static final Text OWNER = new Text("owner"); + + public static final Text KIND2 = new Text("kind2"); + + @Before + public void setup() throws Exception { + conf = StubDelegationTokenManager.useStubDTManager(new Configuration()); + } + + @After + public void teardown() throws Exception { + } + + /** + * Assert that a token is of a specific kind + * @param kind expected kind + * @param dt token. + */ + protected void assertTokenKind(final Text kind, + final Token dt) { + assertEquals("Token Kind", + kind, dt.getKind()); + } + + /** + * Test the classic lifecycle, that is: don't call bind() on the manager, + * so that it does not attempt to bind the custom DT manager it has created. + * + * There'll be no canonical service name from the token manager, which + * will trigger falling back to the default value. + */ + @Test + public void testClassicLifecycle() throws Throwable { + AbfsDelegationTokenManager manager + = new AbfsDelegationTokenManager(conf); + StubDelegationTokenManager stub = getTokenManager(manager); + + // this is automatically inited + assertTrue("Not initialized: " + stub, stub.isInitialized()); + Token dt = stub.getDelegationToken(RENEWER); + assertTokenKind(StubAbfsTokenIdentifier.TOKEN_KIND, dt); + + assertNull("canonicalServiceName in " + stub, + manager.getCanonicalServiceName()); + assertEquals("Issued count number in " + stub, 1, stub.getIssued()); + StubAbfsTokenIdentifier id = decodeIdentifier(dt); + assertEquals("Sequence number in " + id, 1, id.getSequenceNumber()); + stub.renewDelegationToken(dt); + assertEquals("Renewal count in " + stub, 1, stub.getRenewals()); + stub.cancelDelegationToken(dt); + assertEquals("Cancel count in " + stub, 1, stub.getCancellations()); + } + + protected StubDelegationTokenManager getTokenManager(final AbfsDelegationTokenManager manager) { + return (StubDelegationTokenManager) manager.getTokenManager(); + } + + /** + * Instantiate through the manager, but then call direct. + */ + @Test + public void testBindingLifecycle() throws Throwable { + AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf); + StubDelegationTokenManager stub = getTokenManager(manager); + assertTrue("Not initialized: " + stub, stub.isInitialized()); + stub.bind(FSURI, conf); + assertEquals("URI in " + stub, FSURI, stub.getFsURI()); + decodeIdentifier(stub.getDelegationToken(RENEWER)); + stub.close(); + assertTrue("Not closed: " + stub, stub.isClosed()); + // and for resilience + stub.close(); + assertTrue("Not closed: " + stub, stub.isClosed()); + } + + @Test + public void testBindingThroughManager() throws Throwable { + AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf); + manager.bind(FSURI, conf); + StubDelegationTokenManager stub = getTokenManager(manager); + assertEquals("Service in " + manager, + ABFS, stub.createServiceText().toString()); + assertEquals("Binding URI of " + stub, FSURI, stub.getFsURI()); + + Token token = manager.getDelegationToken( + RENEWER); + assertEquals("Service in " + token, + ABFS, token.getService().toString()); + decodeIdentifier(token); + assertTokenKind(StubAbfsTokenIdentifier.TOKEN_KIND, token); + + // now change the token kind on the stub, verify propagation + stub.setKind(KIND2); + + Token dt2 = manager.getDelegationToken(""); + assertTokenKind(KIND2, dt2); + + // change the token kind and, unless it is registered, it will not decode. + assertNull("Token is of unknown kind, must not decode", + dt2.decodeIdentifier()); + + // closing the manager will close the stub too. + manager.close(); + assertTrue("Not closed: " + stub, stub.isClosed()); + } + + /** + * Instantiate a DT manager in the renewal workflow: the manager is + * unbound; tokens must still be issued and cancelled. + */ + @Test + public void testRenewalThroughManager() throws Throwable { + + // create without going through the DT manager, which is of course unbound. + Token dt = createToken(0, FSURI, OWNER, + new Text(RENEWER)); + + // create a DT manager in the renewer codepath. + AbfsDelegationTokenManager manager = new AbfsDelegationTokenManager(conf); + StubDelegationTokenManager stub = getTokenManager(manager); + assertNull("Stub should not bebound " + stub, stub.getFsURI()); + + StubAbfsTokenIdentifier dtId = + (StubAbfsTokenIdentifier) dt.decodeIdentifier(); + String idStr = dtId.toString(); + assertEquals("URI in " + idStr, FSURI, dtId.getUri()); + assertEquals("renewer in " + idStr, + RENEWER, dtId.getRenewer().toString()); + manager.renewDelegationToken(dt); + assertEquals("Renewal count in " + stub, 1, stub.getRenewals()); + manager.cancelDelegationToken(dt); + assertEquals("Cancel count in " + stub, 1, stub.getCancellations()); + + // closing the manager will close the stub too. + manager.close(); + assertTrue("Not closed: " + stub, stub.isClosed()); + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/WrappingTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/WrappingTokenProvider.java new file mode 100644 index 0000000000..dea0723f99 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/WrappingTokenProvider.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.IOException; +import java.net.URI; +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.services.AuthType; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME; + +/** + * Implements a wrapper around ClientCredsTokenProvider. + */ +@SuppressWarnings("UseOfObsoleteDateTimeApi") +public class WrappingTokenProvider implements CustomTokenProviderAdaptee, + BoundDTExtension { + + public static final String NAME + = "org.apache.hadoop.fs.azurebfs.extensions.WrappingTokenProvider"; + + public static final String UA_STRING = "provider="; + + public static final String CREATED = UA_STRING + "created"; + public static final String INITED = UA_STRING + "inited"; + public static final String BOUND = UA_STRING + "bound"; + public static final String CLOSED = UA_STRING + "closed"; + + public static final String ACCESS_TOKEN = "accessToken"; + + /** URI; only set once bound. */ + private URI uri; + + private String accountName; + + private String state = CREATED; + + @Override + public void initialize( + final Configuration configuration, + final String account) + throws IOException { + state = INITED; + accountName = account; + } + + @Override + public String getAccessToken() throws IOException { + return ACCESS_TOKEN; + } + + @Override + public Date getExpiryTime() { + return new Date(System.currentTimeMillis()); + } + + @Override + public void bind(final URI fsURI, final Configuration conf) + throws IOException { + state = BOUND; + uri = fsURI; + } + + public URI getUri() { + return uri; + } + + @Override + public void close() throws IOException { + state = CLOSED; + } + + @Override + public String getUserAgentSuffix() { + return state; + } + + /** + * Enable the custom token provider. + * This doesn't set any account-specific options. + * @param conf configuration to patch. + */ + public static void enable(Configuration conf) { + conf.setEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, + AuthType.Custom); + conf.set(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME, NAME); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-tools/hadoop-azure/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier new file mode 100644 index 0000000000..d7646dca63 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.hadoop.fs.azurebfs.extensions.StubAbfsTokenIdentifier diff --git a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties index bac431d482..9f72d03653 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-azure/src/test/resources/log4j.properties @@ -58,3 +58,4 @@ log4j.logger.org.apache.hadoop.yarn.util.AbstractLivelinessMonitor=WARN log4j.logger.org.apache.hadoop.security.token.delegation=WARN log4j.logger.org.apache.hadoop.mapred.ShuffleHandler=WARN log4j.logger.org.apache.hadoop.ipc.Server=WARN +log4j.logger.org.apache.hadoop.security.ShellBasedUnixGroupsMapping=ERROR