diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml index 38de35e897..7087d786a3 100644 --- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml @@ -15,6 +15,23 @@ limitations under the License. --> + + + + + + + + + + + + + + @@ -24,7 +41,7 @@ + and helps performance. --> @@ -40,7 +57,7 @@ + method. --> diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index d60bc37734..354176f5ff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -210,6 +210,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ABFS_LATENCY_TRACK) private boolean trackLatency; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, + MinValue = 0, + DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) + private long sasTokenRenewPeriodForStreamsInSeconds; + private Map storageAccountKeys; public AbfsConfiguration(final Configuration rawConfig, String accountName) @@ -451,6 +456,10 @@ public boolean isCheckAccessEnabled() { return this.isCheckAccessEnabled; } + public long getSasTokenRenewPeriodForStreamsInSeconds() { + return this.sasTokenRenewPeriodForStreamsInSeconds; + } + public String getAzureBlockLocationHost() { return this.azureBlockLocationHost; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 700d23a327..6694c134b4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -679,15 +679,17 @@ public void setXAttr(final Path path, final String name, final byte[] value, fin throw new IllegalArgumentException("A valid name and value must be specified."); } + Path qualifiedPath = makeQualified(path); + try { - Hashtable properties = abfsStore.getPathStatus(path); + Hashtable properties = abfsStore.getPathStatus(qualifiedPath); String xAttrName = ensureValidAttributeName(name); boolean xAttrExists = properties.containsKey(xAttrName); XAttrSetFlag.validate(name, xAttrExists, flag); String xAttrValue = abfsStore.decodeAttribute(value); properties.put(xAttrName, xAttrValue); - abfsStore.setPathProperties(path, properties); + abfsStore.setPathProperties(qualifiedPath, properties); } catch (AzureBlobFileSystemException ex) { checkException(path, ex); } @@ -712,9 +714,11 @@ public byte[] getXAttr(final Path path, final String name) throw new IllegalArgumentException("A valid name must be specified."); } + Path qualifiedPath = makeQualified(path); + byte[] value = null; try { - Hashtable properties = abfsStore.getPathStatus(path); + Hashtable properties = abfsStore.getPathStatus(qualifiedPath); String xAttrName = ensureValidAttributeName(name); if (properties.containsKey(xAttrName)) { String xAttrValue = properties.get(xAttrName); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index d37ceb3d5b..8e0e6c1eb0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -247,8 +247,7 @@ public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException { LOG.debug("Get root ACL status"); try (AbfsPerfInfo perfInfo = startTracking("getIsNamespaceEnabled", "getAclStatus")) { - AbfsRestOperation op = client.getAclStatus( - AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH); + AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.ROOT_PATH); perfInfo.registerResult(op.getResult()); isNamespaceEnabled = Trilean.getTrilean(true); perfInfo.registerSuccess(true); @@ -353,7 +352,7 @@ public Hashtable getPathStatus(final Path path) throws AzureBlob path); final Hashtable parsedXmsProperties; - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + final AbfsRestOperation op = client.getPathStatus(getRelativePath(path), true); perfInfo.registerResult(op.getResult()); final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); @@ -379,7 +378,7 @@ public void setPathProperties(final Path path, final Hashtable p } catch (CharacterCodingException ex) { throw new InvalidAbfsRestOperationException(ex); } - final AbfsRestOperation op = client.setPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), commaSeparatedProperties); + final AbfsRestOperation op = client.setPathProperties(getRelativePath(path), commaSeparatedProperties); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -418,7 +417,9 @@ public OutputStream createFile(final Path path, umask.toString(), isNamespaceEnabled); - final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.createPath(relativePath, true, overwrite, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -426,14 +427,14 @@ public OutputStream createFile(final Path path, return new AbfsOutputStream( client, statistics, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + relativePath, 0, populateAbfsOutputStreamContext()); } } private AbfsOutputStreamContext populateAbfsOutputStreamContext() { - return new AbfsOutputStreamContext() + return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) .enableFlush(abfsConfiguration.isFlushEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) @@ -452,7 +453,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina umask, isNamespaceEnabled); - final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, + final AbfsRestOperation op = client.createPath(getRelativePath(path), false, true, isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -466,7 +467,9 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist client.getFileSystem(), path); - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getPathStatus(relativePath, false); perfInfo.registerResult(op.getResult()); final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); @@ -485,14 +488,14 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist // Add statistics for InputStream return new AbfsInputStream(client, statistics, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, + relativePath, contentLength, populateAbfsInputStreamContext(), eTag); } } private AbfsInputStreamContext populateAbfsInputStreamContext() { - return new AbfsInputStreamContext() + return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withReadBufferSize(abfsConfiguration.getReadBufferSize()) .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) @@ -507,7 +510,9 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic path, overwrite); - final AbfsRestOperation op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getPathStatus(relativePath, false); perfInfo.registerResult(op.getResult()); final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); @@ -528,7 +533,7 @@ public OutputStream openFileForWrite(final Path path, final FileSystem.Statistic return new AbfsOutputStream( client, statistics, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + relativePath, offset, populateAbfsOutputStreamContext()); } @@ -552,10 +557,13 @@ public void rename(final Path source, final Path destination) throws String continuation = null; + String sourceRelativePath = getRelativePath(source); + String destinationRelativePath = getRelativePath(destination); + do { try (AbfsPerfInfo perfInfo = startTracking("rename", "renamePath")) { - AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); + AbfsRestOperation op = client.renamePath(sourceRelativePath, + destinationRelativePath, continuation); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); perfInfo.registerSuccess(true); @@ -582,10 +590,12 @@ public void delete(final Path path, final boolean recursive) String continuation = null; + String relativePath = getRelativePath(path); + do { try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) { AbfsRestOperation op = client.deletePath( - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); + relativePath, recursive, continuation); perfInfo.registerResult(op.getResult()); continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); perfInfo.registerSuccess(true); @@ -611,14 +621,14 @@ public FileStatus getFileStatus(final Path path) throws IOException { if (path.isRoot()) { if (isNamespaceEnabled) { perfInfo.registerCallee("getAclStatus"); - op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + AbfsHttpConstants.ROOT_PATH); + op = client.getAclStatus(getRelativePath(path)); } else { perfInfo.registerCallee("getFilesystemProperties"); op = client.getFilesystemProperties(); } } else { perfInfo.registerCallee("getPathStatus"); - op = client.getPathStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + op = client.getPathStatus(getRelativePath(path), false); } perfInfo.registerResult(op.getResult()); @@ -698,14 +708,14 @@ public FileStatus[] listStatus(final Path path, final String startFrom) throws I path, startFrom); - final String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); + final String relativePath = getRelativePath(path); String continuation = null; // generate continuation token if a valid startFrom is provided. if (startFrom != null && !startFrom.isEmpty()) { continuation = getIsNamespaceEnabled() ? generateContinuationTokenForXns(startFrom) - : generateContinuationTokenForNonXns(path.isRoot() ? ROOT_PATH : relativePath, startFrom); + : generateContinuationTokenForNonXns(relativePath, startFrom); } ArrayList fileStatuses = new ArrayList<>(); @@ -793,12 +803,13 @@ private String generateContinuationTokenForXns(final String firstEntryName) { } // generate continuation token for non-xns account - private String generateContinuationTokenForNonXns(final String path, final String firstEntryName) { + private String generateContinuationTokenForNonXns(String path, final String firstEntryName) { Preconditions.checkArgument(!Strings.isNullOrEmpty(firstEntryName) && !firstEntryName.startsWith(AbfsHttpConstants.ROOT_PATH), "startFrom must be a dir/file name and it can not be a full path"); // Notice: non-xns continuation token requires full path (first "/" is not included) for startFrom + path = AbfsClient.getDirectoryQueryParameter(path); final String startFrom = (path.isEmpty() || path.equals(ROOT_PATH)) ? firstEntryName : path + ROOT_PATH + firstEntryName; @@ -846,8 +857,7 @@ public void setOwner(final Path path, final String owner, final String group) th final String transformedOwner = identityTransformer.transformUserOrGroupForSetRequest(owner); final String transformedGroup = identityTransformer.transformUserOrGroupForSetRequest(group); - final AbfsRestOperation op = client.setOwner( - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + final AbfsRestOperation op = client.setOwner(getRelativePath(path), transformedOwner, transformedGroup); @@ -870,8 +880,7 @@ public void setPermission(final Path path, final FsPermission permission) throws path.toString(), permission.toString()); - final AbfsRestOperation op = client.setPermission( - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + final AbfsRestOperation op = client.setPermission(getRelativePath(path), String.format(AbfsHttpConstants.PERMISSION_FORMAT, permission.toOctal())); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -897,7 +906,9 @@ public void modifyAclEntries(final Path path, final List aclSpec) thro final Map modifyAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); boolean useUpn = AbfsAclHelper.isUpnFormatAclEntries(modifyAclEntries); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), useUpn); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getAclStatus(relativePath, useUpn); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -909,7 +920,7 @@ public void modifyAclEntries(final Path path, final List aclSpec) thro try (AbfsPerfInfo perfInfoSet = startTracking("modifyAclEntries", "setAcl")) { final AbfsRestOperation setAclOp - = client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + = client.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -936,7 +947,9 @@ public void removeAclEntries(final Path path, final List aclSpec) thro final Map removeAclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(removeAclEntries); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getAclStatus(relativePath, isUpnFormat); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -948,7 +961,7 @@ public void removeAclEntries(final Path path, final List aclSpec) thro try (AbfsPerfInfo perfInfoSet = startTracking("removeAclEntries", "setAcl")) { final AbfsRestOperation setAclOp = - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + client.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -970,7 +983,9 @@ public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemExceptio client.getFileSystem(), path.toString()); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getAclStatus(relativePath); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); final Map aclEntries = AbfsAclHelper.deserializeAclSpec(op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_ACL)); @@ -988,7 +1003,7 @@ public void removeDefaultAcl(final Path path) throws AzureBlobFileSystemExceptio try (AbfsPerfInfo perfInfoSet = startTracking("removeDefaultAcl", "setAcl")) { final AbfsRestOperation setAclOp = - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + client.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -1010,7 +1025,9 @@ public void removeAcl(final Path path) throws AzureBlobFileSystemException { client.getFileSystem(), path.toString()); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getAclStatus(relativePath); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1025,7 +1042,7 @@ public void removeAcl(final Path path) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfoSet = startTracking("removeAcl", "setAcl")) { final AbfsRestOperation setAclOp = - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + client.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(newAclEntries), eTag); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -1052,7 +1069,9 @@ public void setAcl(final Path path, final List aclSpec) throws AzureBl final Map aclEntries = AbfsAclHelper.deserializeAclSpec(AclEntry.aclSpecToString(aclSpec)); final boolean isUpnFormat = AbfsAclHelper.isUpnFormatAclEntries(aclEntries); - final AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), isUpnFormat); + String relativePath = getRelativePath(path); + + final AbfsRestOperation op = client.getAclStatus(relativePath, isUpnFormat); perfInfoGet.registerResult(op.getResult()); final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); @@ -1064,7 +1083,7 @@ public void setAcl(final Path path, final List aclSpec) throws AzureBl try (AbfsPerfInfo perfInfoSet = startTracking("setAcl", "setAcl")) { final AbfsRestOperation setAclOp = - client.setAcl(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true), + client.setAcl(relativePath, AbfsAclHelper.serializeAclSpec(aclEntries), eTag); perfInfoSet.registerResult(setAclOp.getResult()) .registerSuccess(true) @@ -1086,7 +1105,7 @@ public AclStatus getAclStatus(final Path path) throws IOException { client.getFileSystem(), path.toString()); - AbfsRestOperation op = client.getAclStatus(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true)); + AbfsRestOperation op = client.getAclStatus(getRelativePath(path)); AbfsHttpOperation result = op.getResult(); perfInfo.registerResult(result); @@ -1130,10 +1149,8 @@ public void access(final Path path, final FsAction mode) return; } try (AbfsPerfInfo perfInfo = startTracking("access", "checkAccess")) { - String relativePath = - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path, true); final AbfsRestOperation op = this.client - .checkAccess(relativePath, mode.SYMBOL); + .checkAccess(getRelativePath(path), mode.SYMBOL); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -1201,22 +1218,8 @@ private String getOctalNotation(FsPermission fsPermission) { } private String getRelativePath(final Path path) { - return getRelativePath(path, false); - } - - private String getRelativePath(final Path path, final boolean allowRootPath) { Preconditions.checkNotNull(path, "path"); - final String relativePath = path.toUri().getPath(); - - if (relativePath.length() == 0 || (relativePath.length() == 1 && relativePath.charAt(0) == Path.SEPARATOR_CHAR)) { - return allowRootPath ? AbfsHttpConstants.ROOT_PATH : AbfsHttpConstants.EMPTY_STRING; - } - - if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) { - return relativePath.substring(1); - } - - return relativePath; + return path.toUri().getPath(); } private long parseContentLength(final String contentLength) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 597f93fcfb..5794d32f46 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -142,5 +142,8 @@ public static String accountProperty(String property, String account) { /** Key for SAS token provider **/ public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type"; + /** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/ + public static final String FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS = "fs.azure.sas.token.renew.period.for.streams"; + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index ef2e708d5a..01d5202cc2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -76,6 +76,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_USE_UPN = false; public static final boolean DEFAULT_ENABLE_CHECK_ACCESS = false; public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false; + public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java index 9cfe2bc12e..2cd44f1b90 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java @@ -32,22 +32,23 @@ @InterfaceStability.Unstable public interface SASTokenProvider { - String CONCAT_SOURCE_OPERATION = "concat-source"; - String CONCAT_TARGET_OPERATION = "concat-target"; - String CREATEFILE_OPERATION = "create"; + String CHECK_ACCESS_OPERATION = "check-access"; + String CREATE_FILE_OPERATION = "create-file"; String DELETE_OPERATION = "delete"; - String EXECUTE_OPERATION = "execute"; - String GETACL_OPERATION = "getaclstatus"; - String GETFILESTATUS_OPERATION = "getfilestatus"; - String LISTSTATUS_OPERATION = "liststatus"; - String MKDIR_OPERATION = "mkdir"; + String DELETE_RECURSIVE_OPERATION = "delete-recursive"; + String GET_ACL_OPERATION = "get-acl"; + String GET_STATUS_OPERATION = "get-status"; + String GET_PROPERTIES_OPERATION = "get-properties"; + String LIST_OPERATION = "list"; + String CREATE_DIRECTORY_OPERATION = "create-directory"; String READ_OPERATION = "read"; String RENAME_SOURCE_OPERATION = "rename-source"; String RENAME_DESTINATION_OPERATION = "rename-destination"; - String SETACL_OPERATION = "setacl"; - String SETOWNER_OPERATION = "setowner"; - String SETPERMISSION_OPERATION = "setpermission"; - String APPEND_OPERATION = "write"; + String SET_ACL_OPERATION = "set-acl"; + String SET_OWNER_OPERATION = "set-owner"; + String SET_PERMISSION_OPERATION = "set-permission"; + String SET_PROPERTIES_OPERATION = "set-properties"; + String WRITE_OPERATION = "write"; /** * Initialize authorizer for Azure Blob File System. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java index f836bab766..435335fc20 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -52,6 +52,7 @@ public final class AzureADAuthenticator { private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class); private static final String RESOURCE_NAME = "https://storage.azure.com/"; + private static final String SCOPE = "https://storage.azure.com/.default"; private static final int CONNECT_TIMEOUT = 30 * 1000; private static final int READ_TIMEOUT = 30 * 1000; @@ -85,9 +86,14 @@ public static AzureADToken getTokenUsingClientCreds(String authEndpoint, Preconditions.checkNotNull(authEndpoint, "authEndpoint"); Preconditions.checkNotNull(clientId, "clientId"); Preconditions.checkNotNull(clientSecret, "clientSecret"); + boolean isVersion2AuthenticationEndpoint = authEndpoint.contains("/oauth2/v2.0/"); QueryParams qp = new QueryParams(); - qp.add("resource", RESOURCE_NAME); + if (isVersion2AuthenticationEndpoint) { + qp.add("scope", SCOPE); + } else { + qp.add("resource", RESOURCE_NAME); + } qp.add("grant_type", "client_credentials"); qp.add("client_id", clientId); qp.add("client_secret", clientSecret); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 6bacde4866..70d139917e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -29,6 +29,7 @@ import java.util.Locale; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; @@ -207,13 +208,12 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? AbfsHttpConstants.EMPTY_STRING - : relativePath); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, getDirectoryQueryParameter(relativePath)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); - appendSASTokenToQuery(relativePath, SASTokenProvider.LISTSTATUS_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -279,8 +279,8 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); String operation = isFile - ? SASTokenProvider.CREATEFILE_OPERATION - : SASTokenProvider.MKDIR_OPERATION; + ? SASTokenProvider.CREATE_FILE_OPERATION + : SASTokenProvider.CREATE_DIRECTORY_OPERATION; appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); @@ -325,7 +325,7 @@ public AbfsRestOperation renamePath(String source, final String destination, fin } public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, - final int length) throws AzureBlobFileSystemException { + final int length, final String cachedSasToken) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. @@ -335,7 +335,9 @@ public AbfsRestOperation append(final String path, final long position, final by final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); - appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -343,12 +345,13 @@ public AbfsRestOperation append(final String path, final long position, final by this, HTTP_METHOD_PUT, url, - requestHeaders, buffer, offset, length); + requestHeaders, buffer, offset, length, sasTokenForReuse); op.execute(); return op; } - public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose) + public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, + boolean isClose, final String cachedSasToken) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use @@ -361,7 +364,9 @@ public AbfsRestOperation flush(final String path, final long position, boolean r abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); - appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, + abfsUriQueryBuilder, cachedSasToken); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -369,7 +374,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r this, HTTP_METHOD_PUT, url, - requestHeaders); + requestHeaders, sasTokenForReuse); op.execute(); return op; } @@ -386,6 +391,7 @@ public AbfsRestOperation setPathProperties(final String path, final String prope final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); + appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -398,12 +404,20 @@ public AbfsRestOperation setPathProperties(final String path, final String prope return op; } - public AbfsRestOperation getPathStatus(final String path) throws AzureBlobFileSystemException { + public AbfsRestOperation getPathStatus(final String path, final boolean includeProperties) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + String operation = SASTokenProvider.GET_PROPERTIES_OPERATION; + if (!includeProperties) { + // The default action (operation) is implicitly to get properties and this action requires read permission + // because it reads user defined properties. If the action is getStatus or getAclStatus, then + // only traversal (execute) permission is required. + abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS); + operation = SASTokenProvider.GET_STATUS_OPERATION; + } abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed())); - appendSASTokenToQuery(path, SASTokenProvider.GETFILESTATUS_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -417,14 +431,16 @@ public AbfsRestOperation getPathStatus(final String path) throws AzureBlobFileSy } public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength, final String eTag) throws AzureBlobFileSystemException { + final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); requestHeaders.add(new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1))); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, abfsUriQueryBuilder); + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance + String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, + abfsUriQueryBuilder, cachedSasToken); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); @@ -436,7 +452,7 @@ public AbfsRestOperation read(final String path, final long position, final byte requestHeaders, buffer, bufferOffset, - bufferLength); + bufferLength, sasTokenForReuse); op.execute(); return op; @@ -449,7 +465,8 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive, final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); - appendSASTokenToQuery(path, SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder); + String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION; + appendSASTokenToQuery(path, operation, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -479,7 +496,7 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SETOWNER_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -504,7 +521,7 @@ public AbfsRestOperation setPermission(final String path, final String permissio final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SETPERMISSION_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -537,7 +554,7 @@ public AbfsRestOperation setAcl(final String path, final String aclSpecString, f final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL); - appendSASTokenToQuery(path, SASTokenProvider.SETACL_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -560,7 +577,7 @@ public AbfsRestOperation getAclStatus(final String path, final boolean useUPN) t final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL); abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN)); - appendSASTokenToQuery(path, SASTokenProvider.GETACL_OPERATION, abfsUriQueryBuilder); + appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( @@ -587,6 +604,7 @@ public AbfsRestOperation checkAccess(String path, String rwx) AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS); abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx); + appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder); URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); AbfsRestOperation op = new AbfsRestOperation( AbfsRestOperationType.CheckAccess, this, @@ -595,23 +613,65 @@ public AbfsRestOperation checkAccess(String path, String rwx) return op; } + /** + * Get the directory query parameter used by the List Paths REST API and used + * as the path in the continuation token. If the input path is null or the + * root path "/", empty string is returned. If the input path begins with '/', + * the return value is the substring beginning at offset 1. Otherwise, the + * input path is returned. + * @param path the path to be listed. + * @return the value of the directory query parameter + */ + public static String getDirectoryQueryParameter(final String path) { + String directory = path; + if (Strings.isNullOrEmpty(directory)) { + directory = AbfsHttpConstants.EMPTY_STRING; + } else if (directory.charAt(0) == '/') { + directory = directory.substring(1); + } + return directory; + } + /** * If configured for SAS AuthType, appends SAS token to queryBuilder * @param path * @param operation * @param queryBuilder + * @return sasToken - returned for optional re-use. * @throws SASTokenProviderException */ - private void appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException { + private String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException { + return appendSASTokenToQuery(path, operation, queryBuilder, null); + } + + /** + * If configured for SAS AuthType, appends SAS token to queryBuilder + * @param path + * @param operation + * @param queryBuilder + * @param cachedSasToken - previously acquired SAS token to be reused. + * @return sasToken - returned for optional re-use. + * @throws SASTokenProviderException + */ + private String appendSASTokenToQuery(String path, + String operation, + AbfsUriQueryBuilder queryBuilder, + String cachedSasToken) + throws SASTokenProviderException { + String sasToken = null; if (this.authType == AuthType.SAS) { try { LOG.trace("Fetch SAS token for {} on {}", operation, path); - String sasToken = sasTokenProvider.getSASToken(this.accountName, - this.filesystem, path, operation); - if ((sasToken == null) || sasToken.isEmpty()) { - throw new UnsupportedOperationException("SASToken received is empty or null"); + if (cachedSasToken == null) { + sasToken = sasTokenProvider.getSASToken(this.accountName, + this.filesystem, path, operation); + if ((sasToken == null) || sasToken.isEmpty()) { + throw new UnsupportedOperationException("SASToken received is empty or null"); + } + } else { + sasToken = cachedSasToken; + LOG.trace("Using cached SAS token."); } - queryBuilder.setSASToken(sasToken); LOG.trace("SAS token fetch complete for {} on {}", operation, path); } catch (Exception ex) { @@ -621,6 +681,7 @@ private void appendSASTokenToQuery(String path, String operation, AbfsUriQueryBu ex.toString())); } } + return sasToken; } private URL createRequestUrl(final String query) throws AzureBlobFileSystemException { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 05c093a880..422fa3b2f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -51,6 +52,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + // SAS tokens can be re-used until they expire + private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server @@ -76,6 +79,8 @@ public AbfsInputStream( this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; + this.cachedSasToken = new CachedSASToken( + abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); } public String getPath() { @@ -234,7 +239,8 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { - op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); + op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); incrementReadOps(); } catch (AzureBlobFileSystemException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index cba7191016..a847b56eab 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -29,7 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean tolerateOobAppends; - public AbfsInputStreamContext() { + public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { + super(sasTokenRenewPeriodForStreamsInSeconds); } public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 4297b18651..89afca4220 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FSExceptionMessages; @@ -73,6 +74,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; + // SAS tokens can be re-used until they expire + private CachedSASToken cachedSasToken; + /** * Queue storing buffers with the size of the Azure block ready for * reuse. The pool allows reusing the blocks instead of allocating new @@ -119,6 +123,8 @@ public AbfsOutputStream( TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + this.cachedSasToken = new CachedSASToken( + abfsOutputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds()); } /** @@ -330,7 +336,8 @@ public Void call() throws Exception { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength); + bytesLength, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); perfInfo.registerSuccess(true); @@ -385,7 +392,8 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose); + AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get()); + cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index e0aefbf33b..dcd6c45981 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -31,7 +31,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private AbfsOutputStreamStatistics streamStatistics; - public AbfsOutputStreamContext() { + public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { + super(sasTokenRenewPeriodForStreamsInSeconds); } public AbfsOutputStreamContext withWriteBufferSize( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 445c366543..2f9ab88017 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -53,6 +53,9 @@ public class AbfsRestOperation { // request body and all the download methods have a response body. private final boolean hasRequestBody; + // Used only by AbfsInputStream/AbfsOutputStream to reuse SAS tokens. + private final String sasToken; + private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); // For uploads, this is the request entity body. For downloads, @@ -67,6 +70,10 @@ public AbfsHttpOperation getResult() { return result; } + String getSasToken() { + return sasToken; + } + /** * Initializes a new REST operation. * @@ -80,6 +87,24 @@ public AbfsHttpOperation getResult() { final String method, final URL url, final List requestHeaders) { + this(operationType, client, method, url, requestHeaders, null); + } + + /** + * Initializes a new REST operation. + * + * @param client The Blob FS client. + * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). + * @param url The full URL including query string parameters. + * @param requestHeaders The HTTP request headers. + * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream. + */ + AbfsRestOperation(final AbfsRestOperationType operationType, + final AbfsClient client, + final String method, + final URL url, + final List requestHeaders, + final String sasToken) { this.operationType = operationType; this.client = client; this.method = method; @@ -87,6 +112,7 @@ public AbfsHttpOperation getResult() { this.requestHeaders = requestHeaders; this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); + this.sasToken = sasToken; } /** @@ -101,6 +127,7 @@ public AbfsHttpOperation getResult() { * this will hold the response entity body. * @param bufferOffset An offset into the buffer where the data beings. * @param bufferLength The length of the data in the buffer. + * @param sasToken A sasToken for optional re-use by AbfsInputStream/AbfsOutputStream. */ AbfsRestOperation(AbfsRestOperationType operationType, AbfsClient client, @@ -109,8 +136,9 @@ public AbfsHttpOperation getResult() { List requestHeaders, byte[] buffer, int bufferOffset, - int bufferLength) { - this(operationType, client, method, url, requestHeaders); + int bufferLength, + String sasToken) { + this(operationType, client, method, url, requestHeaders, sasToken); this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java index ee77f595fe..9cd858cde8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java @@ -23,4 +23,17 @@ * to store common configs among input and output streams. */ public abstract class AbfsStreamContext { + private long sasTokenRenewPeriodForStreamsInSeconds; + + // hide default constructor + private AbfsStreamContext() { + } + + public AbfsStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) { + this.sasTokenRenewPeriodForStreamsInSeconds = sasTokenRenewPeriodForStreamsInSeconds; + } + + public long getSasTokenRenewPeriodForStreamsInSeconds() { + return sasTokenRenewPeriodForStreamsInSeconds; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java new file mode 100644 index 0000000000..620b4c0380 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/CachedSASToken.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS; +import static java.time.temporal.ChronoUnit.SECONDS; + +/** + * CachedSASToken provides simple utility for managing renewal + * of SAS tokens used by Input/OutputStream. This enables SAS re-use + * and reduces calls to the SASTokenProvider. + */ +public final class CachedSASToken { + public static final Logger LOG = LoggerFactory.getLogger(CachedSASToken.class); + private final long minExpirationInSeconds; + private String sasToken; + private OffsetDateTime sasExpiry; + + /** + * Create instance with default minimum expiration. SAS tokens are + * automatically renewed when their expiration is within this period. + */ + public CachedSASToken() { + this(DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS); + } + + /** + * Create instance with specified minimum expiration. SAS tokens are + * automatically renewed when their expiration is within this period. + * @param minExpirationInSeconds + */ + public CachedSASToken(long minExpirationInSeconds) { + this.minExpirationInSeconds = minExpirationInSeconds; + } + + /** + * Checks if the SAS token is expired or near expiration. + * @param expiry + * @param minExpiryInSeconds + * @return true if the SAS is near sasExpiry; otherwise false + */ + private static boolean isNearExpiry(OffsetDateTime expiry, long minExpiryInSeconds) { + if (expiry == OffsetDateTime.MIN) { + return true; + } + OffsetDateTime utcNow = OffsetDateTime.now(ZoneOffset.UTC); + return utcNow.until(expiry, SECONDS) <= minExpiryInSeconds; + } + + /** + * Parse the sasExpiry from the SAS token. The sasExpiry is the minimum + * of the ske and se parameters. The se parameter is required and the + * ske parameter is optional. + * @param token an Azure Storage SAS token + * @return the sasExpiry or OffsetDateTime.MIN if invalid. + */ + private static OffsetDateTime getExpiry(String token) { + // return MIN for all invalid input, including a null token + if (token == null) { + return OffsetDateTime.MIN; + } + + String signedExpiry = "se="; + int signedExpiryLen = 3; + + int start = token.indexOf(signedExpiry); + + // return MIN if the required se parameter is absent + if (start == -1) { + return OffsetDateTime.MIN; + } + + start += signedExpiryLen; + + // extract the value of se parameter + int end = token.indexOf("&", start); + String seValue = (end == -1) ? token.substring(start) : token.substring(start, end); + + try { + seValue = URLDecoder.decode(seValue, "utf-8"); + } catch (UnsupportedEncodingException ex) { + LOG.error("Error decoding se query parameter ({}) from SAS.", seValue, ex); + return OffsetDateTime.MIN; + } + + // parse the ISO 8601 date value; return MIN if invalid + OffsetDateTime seDate = OffsetDateTime.MIN; + try { + seDate = OffsetDateTime.parse(seValue, DateTimeFormatter.ISO_DATE_TIME); + } catch (DateTimeParseException ex) { + LOG.error("Error parsing se query parameter ({}) from SAS.", seValue, ex); + } + + String signedKeyExpiry = "ske="; + int signedKeyExpiryLen = 4; + + // if ske is present, the sasExpiry is the minimum of ske and se + start = token.indexOf(signedKeyExpiry); + + // return seDate if ske is absent + if (start == -1) { + return seDate; + } + + start += signedKeyExpiryLen; + + // extract the value of ske parameter + end = token.indexOf("&", start); + String skeValue = (end == -1) ? token.substring(start) : token.substring(start, end); + + try { + skeValue = URLDecoder.decode(skeValue, "utf-8"); + } catch (UnsupportedEncodingException ex) { + LOG.error("Error decoding ske query parameter ({}) from SAS.", skeValue, ex); + return OffsetDateTime.MIN; + } + + // parse the ISO 8601 date value; return MIN if invalid + OffsetDateTime skeDate = OffsetDateTime.MIN; + try { + skeDate = OffsetDateTime.parse(skeValue, DateTimeFormatter.ISO_DATE_TIME); + } catch (DateTimeParseException ex) { + LOG.error("Error parsing ske query parameter ({}) from SAS.", skeValue, ex); + return OffsetDateTime.MIN; + } + + return skeDate.isBefore(seDate) ? skeDate : seDate; + } + + /** + * Updates the cached SAS token and expiry. If the token is invalid, the cached value + * is cleared by setting it to null and the expiry to MIN. + * @param token an Azure Storage SAS token + */ + public void update(String token) { + // quickly return if token and cached sasToken are the same reference + // Note: use of operator == is intentional + if (token == sasToken) { + return; + } + OffsetDateTime newExpiry = getExpiry(token); + boolean isInvalid = isNearExpiry(newExpiry, minExpirationInSeconds); + synchronized (this) { + if (isInvalid) { + sasToken = null; + sasExpiry = OffsetDateTime.MIN; + } else { + sasToken = token; + sasExpiry = newExpiry; + } + } + } + + /** + * Gets the token if still valid. + * @return the token or null if it is expired or near sasExpiry. + */ + public String get() { + // quickly return null if not set + if (sasToken == null) { + return null; + } + String token; + OffsetDateTime exp; + synchronized (this) { + token = sasToken; + exp = sasExpiry; + } + boolean isInvalid = isNearExpiry(exp, minExpirationInSeconds); + return isInvalid ? null : token; + } + + @VisibleForTesting + void setForTesting(String token, OffsetDateTime expiry) { + synchronized (this) { + sasToken = token; + sasExpiry = expiry; + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 2b4b14d264..89f52e7e84 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -313,6 +313,7 @@ driven by them. 1. Using OAuth 2.0 tokens of one form or another. 1. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application, "Managed Instance". +1. Using Shared Access Signature (SAS) tokens provided by a custom implementation of the SASTokenProvider interface. What can be changed is what secrets/credentials are used to authenticate the caller. @@ -541,6 +542,24 @@ and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`. The declared class also holds responsibility to implement retry logic while fetching access tokens. +### Shared Access Signature (SAS) Token Provider + +A Shared Access Signature (SAS) token provider supplies the ABFS connector with SAS +tokens by implementing the SASTokenProvider interface. + +```xml + + fs.azure.account.auth.type + SAS + + + fs.azure.sas.token.provider.type + {fully-qualified-class-name-for-implementation-of-SASTokenProvider-interface} + +``` + +The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`. + ## Technical notes ### Proxy setup diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md index a26da839f0..87cbb97aa0 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md @@ -646,7 +646,7 @@ hierarchical namespace enabled, and set the following configuration settings: fs.azure.account.auth.type.{YOUR_ABFS_ACCOUNT_NAME} {AUTH TYPE} - The authorization type can be SharedKey, OAuth, or Custom. The + The authorization type can be SharedKey, OAuth, Custom or SAS. The default is SharedKey. @@ -793,6 +793,79 @@ hierarchical namespace enabled, and set the following configuration settings: --> +``` +To run Delegation SAS test cases you must use a storage account with the +hierarchical namespace enabled and set the following configuration settings: + +```xml + + + + + + fs.azure.sas.token.provider.type + org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider + The fully qualified class name of the SAS token provider implementation. + + + + fs.azure.test.app.service.principal.tenant.id + {TID} + Tenant ID for the application's service principal. + + + + fs.azure.test.app.service.principal.object.id + {OID} + Object ID for the application's service principal. + + + + fs.azure.test.app.id + {app id} + The application's ID, also known as the client id. + + + + fs.azure.test.app.secret + {client secret} + The application's secret, also known as the client secret. + + + ``` If running tests against an endpoint that uses the URL format diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 87a3dcd881..f41cbd6318 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +80,7 @@ public abstract class AbstractAbfsIntegrationTest extends private String testUrl; private AuthType authType; private boolean useConfiguredFileSystem = false; + private boolean usingFilesystemForSASTests = false; protected AbstractAbfsIntegrationTest() throws Exception { fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); @@ -175,8 +177,13 @@ public void teardown() throws Exception { return; } - // Delete all uniquely created filesystem from the account - if (!useConfiguredFileSystem) { + if (usingFilesystemForSASTests) { + abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey.name()); + AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig); + tempFs.getAbfsStore().deleteFilesystem(); + } + else if (!useConfiguredFileSystem) { + // Delete all uniquely created filesystem from the account final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore(); abfsStore.deleteFilesystem(); @@ -225,6 +232,16 @@ public void loadConfiguredFileSystem() throws Exception { useConfiguredFileSystem = true; } + protected void createFilesystemForSASTests() throws Exception { + // The SAS tests do not have permission to create a filesystem + // so first create temporary instance of the filesystem using SharedKey + // then re-use the filesystem it creates with SAS auth instead of SharedKey. + AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig); + Assert.assertTrue(tempFs.exists(new Path("/"))); + abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name()); + usingFilesystemForSASTests = true; + } + public AzureBlobFileSystem getFileSystem() throws IOException { return abfs; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java index 94e0ce3f48..1278e652b3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAuthorization.java @@ -209,46 +209,55 @@ public void testGetFileStatusUnauthorized() throws Exception { @Test public void testSetOwnerUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.SetOwner, true); } @Test public void testSetPermissionUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.SetPermissions, true); } @Test public void testModifyAclEntriesUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.ModifyAclEntries, true); } @Test public void testRemoveAclEntriesUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.RemoveAclEntries, true); } @Test public void testRemoveDefaultAclUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.RemoveDefaultAcl, true); } @Test public void testRemoveAclUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.RemoveAcl, true); } @Test public void testSetAclUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.SetAcl, true); } @Test public void testGetAclStatusAuthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.GetAcl, false); } @Test public void testGetAclStatusUnauthorized() throws Exception { + Assume.assumeTrue(this.getFileSystem().getIsNamespaceEnabled()); runTest(FileSystemOperations.GetAcl, true); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java index bc5fc59d9d..4189d666e7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCheckAccess.java @@ -273,6 +273,8 @@ private void assumeHNSAndCheckAccessEnabled() { isHNSEnabled); Assume.assumeTrue(FS_AZURE_ENABLE_CHECK_ACCESS + " is false", isCheckAccessEnabled); + + Assume.assumeNotNull(getRawConfiguration().get(FS_AZURE_BLOB_FS_CLIENT_ID)); } private void assertAccessible(Path testFilePath, FsAction fsAction) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java new file mode 100644 index 0000000000..07b5804d11 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AuthType; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.AccessControlException; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE; +import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry; +import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS; +import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT; +import static org.apache.hadoop.fs.permission.AclEntryType.GROUP; +import static org.apache.hadoop.fs.permission.AclEntryType.USER; + +/** + * Test Perform Authorization Check operation + */ +public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrationTest { + private static final String TEST_GROUP = UUID.randomUUID().toString(); + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAzureBlobFileSystemDelegationSAS.class); + + public ITestAzureBlobFileSystemDelegationSAS() throws Exception { + // These tests rely on specific settings in azure-auth-keys.xml: + String sasProvider = getRawConfiguration().get(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE); + Assume.assumeTrue(MockDelegationSASTokenProvider.class.getCanonicalName().equals(sasProvider)); + Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID)); + Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET)); + Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID)); + Assume.assumeNotNull(getRawConfiguration().get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID)); + // The test uses shared key to create a random filesystem and then creates another + // instance of this filesystem using SAS authorization. + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); + } + + @Override + public void setup() throws Exception { + boolean isHNSEnabled = this.getConfiguration().getBoolean( + TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false); + Assume.assumeTrue(isHNSEnabled); + createFilesystemForSASTests(); + super.setup(); + } + + @Test + // Test filesystem operations access, create, mkdirs, setOwner, getFileStatus + public void testCheckAccess() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + + Path rootPath = new Path("/"); + fs.setPermission(rootPath, new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.EXECUTE)); + FileStatus rootStatus = fs.getFileStatus(rootPath); + assertEquals("The directory permissions are not expected.", "rwxr-x--x", rootStatus.getPermission().toString()); + + Path dirPath = new Path(UUID.randomUUID().toString()); + fs.mkdirs(dirPath); + fs.setOwner(dirPath, MockDelegationSASTokenProvider.TEST_OWNER, null); + + Path filePath = new Path(dirPath, "file1"); + fs.create(filePath).close(); + fs.setPermission(filePath, new FsPermission(FsAction.READ, FsAction.READ, FsAction.NONE)); + + FileStatus dirStatus = fs.getFileStatus(dirPath); + FileStatus fileStatus = fs.getFileStatus(filePath); + + assertEquals("The owner is not expected.", MockDelegationSASTokenProvider.TEST_OWNER, dirStatus.getOwner()); + assertEquals("The owner is not expected.", MockDelegationSASTokenProvider.TEST_OWNER, fileStatus.getOwner()); + assertEquals("The directory permissions are not expected.", "rwxr-xr-x", dirStatus.getPermission().toString()); + assertEquals("The file permissions are not expected.", "r--r-----", fileStatus.getPermission().toString()); + + assertTrue(isAccessible(fs, dirPath, FsAction.READ_WRITE)); + assertFalse(isAccessible(fs, filePath, FsAction.READ_WRITE)); + + fs.setPermission(filePath, new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE)); + fileStatus = fs.getFileStatus(filePath); + assertEquals("The file permissions are not expected.", "rw-r-----", fileStatus.getPermission().toString()); + assertTrue(isAccessible(fs, filePath, FsAction.READ_WRITE)); + + fs.setPermission(dirPath, new FsPermission(FsAction.EXECUTE, FsAction.NONE, FsAction.NONE)); + dirStatus = fs.getFileStatus(dirPath); + assertEquals("The file permissions are not expected.", "--x------", dirStatus.getPermission().toString()); + assertFalse(isAccessible(fs, dirPath, FsAction.READ_WRITE)); + assertTrue(isAccessible(fs, dirPath, FsAction.EXECUTE)); + + fs.setPermission(dirPath, new FsPermission(FsAction.NONE, FsAction.NONE, FsAction.NONE)); + dirStatus = fs.getFileStatus(dirPath); + assertEquals("The file permissions are not expected.", "---------", dirStatus.getPermission().toString()); + assertFalse(isAccessible(fs, filePath, FsAction.READ_WRITE)); + } + + private boolean isAccessible(FileSystem fs, Path path, FsAction fsAction) + throws IOException { + try { + fs.access(path, fsAction); + } catch (AccessControlException ace) { + return false; + } + return true; + } + + @Test + // Test filesystem operations create, create with overwrite, append and open. + // Test output stream operation write, flush and close + // Test input stream operation, read + public void testReadAndWrite() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path reqPath = new Path(UUID.randomUUID().toString()); + + final String msg1 = "purple"; + final String msg2 = "yellow"; + int expectedFileLength = msg1.length() * 2; + + byte[] readBuffer = new byte[1024]; + + // create file with content "purplepurple" + try (FSDataOutputStream stream = fs.create(reqPath)) { + stream.writeBytes(msg1); + stream.hflush(); + stream.writeBytes(msg1); + } + + // open file and verify content is "purplepurple" + try (FSDataInputStream stream = fs.open(reqPath)) { + int bytesRead = stream.read(readBuffer, 0, readBuffer.length); + assertEquals(expectedFileLength, bytesRead); + String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8); + assertEquals(msg1 + msg1, fileContent); + } + + // overwrite file with content "yellowyellow" + try (FSDataOutputStream stream = fs.create(reqPath)) { + stream.writeBytes(msg2); + stream.hflush(); + stream.writeBytes(msg2); + } + + // open file and verify content is "yellowyellow" + try (FSDataInputStream stream = fs.open(reqPath)) { + int bytesRead = stream.read(readBuffer, 0, readBuffer.length); + assertEquals(expectedFileLength, bytesRead); + String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8); + assertEquals(msg2 + msg2, fileContent); + } + + // append to file so final content is "yellowyellowpurplepurple" + try (FSDataOutputStream stream = fs.append(reqPath)) { + stream.writeBytes(msg1); + stream.hflush(); + stream.writeBytes(msg1); + } + + // open file and verify content is "yellowyellowpurplepurple" + try (FSDataInputStream stream = fs.open(reqPath)) { + int bytesRead = stream.read(readBuffer, 0, readBuffer.length); + assertEquals(2 * expectedFileLength, bytesRead); + String fileContent = new String(readBuffer, 0, bytesRead, StandardCharsets.UTF_8); + assertEquals(msg2 + msg2 + msg1 + msg1, fileContent); + } + } + + @Test + // Test rename file and rename folder + public void testRename() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path sourceDir = new Path(UUID.randomUUID().toString()); + Path sourcePath = new Path(sourceDir, UUID.randomUUID().toString()); + Path destinationPath = new Path(sourceDir, UUID.randomUUID().toString()); + Path destinationDir = new Path(UUID.randomUUID().toString()); + + // create file with content "hello" + try (FSDataOutputStream stream = fs.create(sourcePath)) { + stream.writeBytes("hello"); + } + + assertFalse(fs.exists(destinationPath)); + fs.rename(sourcePath, destinationPath); + assertFalse(fs.exists(sourcePath)); + assertTrue(fs.exists(destinationPath)); + + assertFalse(fs.exists(destinationDir)); + fs.rename(sourceDir, destinationDir); + assertFalse(fs.exists(sourceDir)); + assertTrue(fs.exists(destinationDir)); + } + + @Test + // Test delete file and delete folder + public void testDelete() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path dirPath = new Path(UUID.randomUUID().toString()); + Path filePath = new Path(dirPath, UUID.randomUUID().toString()); + + // create file with content "hello" + try (FSDataOutputStream stream = fs.create(filePath)) { + stream.writeBytes("hello"); + } + + assertTrue(fs.exists(filePath)); + fs.delete(filePath, false); + assertFalse(fs.exists(filePath)); + + assertTrue(fs.exists(dirPath)); + fs.delete(dirPath, false); + assertFalse(fs.exists(dirPath)); + } + + @Test + // Test delete folder recursive + public void testDeleteRecursive() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path dirPath = new Path(UUID.randomUUID().toString()); + Path filePath = new Path(dirPath, UUID.randomUUID().toString()); + + // create file with content "hello" + try (FSDataOutputStream stream = fs.create(filePath)) { + stream.writeBytes("hello"); + } + + assertTrue(fs.exists(dirPath)); + assertTrue(fs.exists(filePath)); + fs.delete(dirPath, true); + assertFalse(fs.exists(filePath)); + assertFalse(fs.exists(dirPath)); + } + + @Test + // Test list on file, directory and root path + public void testList() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path dirPath = new Path(UUID.randomUUID().toString()); + Path filePath = new Path(dirPath, UUID.randomUUID().toString()); + + fs.mkdirs(dirPath); + + // create file with content "hello" + try (FSDataOutputStream stream = fs.create(filePath)) { + stream.writeBytes("hello"); + } + + fs.listStatus(filePath); + fs.listStatus(dirPath); + fs.listStatus(new Path("/")); + } + + @Test + // Test filesystem operations setAcl, getAclStatus, removeAcl + // setPermissions and getFileStatus + public void testAcl() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path reqPath = new Path(UUID.randomUUID().toString()); + + fs.create(reqPath).close(); + + fs.setAcl(reqPath, Arrays + .asList(aclEntry(ACCESS, GROUP, TEST_GROUP, FsAction.ALL))); + + AclStatus acl = fs.getAclStatus(reqPath); + assertEquals(MockDelegationSASTokenProvider.TEST_OWNER, acl.getOwner()); + assertEquals("[group::r--, group:" + TEST_GROUP + ":rwx]", acl.getEntries().toString()); + + fs.removeAcl(reqPath); + acl = fs.getAclStatus(reqPath); + assertEquals("[]", acl.getEntries().toString()); + + fs.setPermission(reqPath, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + + FileStatus status = fs.getFileStatus(reqPath); + assertEquals("rwx------", status.getPermission().toString()); + + acl = fs.getAclStatus(reqPath); + assertEquals("rwx------", acl.getPermission().toString()); + } + + @Test + // Test getFileStatus and getAclStatus operations on root path + public void testRootPath() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path rootPath = new Path(AbfsHttpConstants.ROOT_PATH); + + FileStatus status = fs.getFileStatus(rootPath); + assertEquals("rwxr-x---", status.getPermission().toString()); + assertTrue(status.isDirectory()); + + AclStatus acl = fs.getAclStatus(rootPath); + assertEquals("rwxr-x---", acl.getPermission().toString()); + + List aclSpec = new ArrayList<>(); + int count = 0; + for (AclEntry entry: acl.getEntries()) { + aclSpec.add(entry); + if (entry.getScope() == AclEntryScope.DEFAULT) { + count++; + } + } + assertEquals(0, count); + + aclSpec.add(aclEntry(DEFAULT, USER, "cd548981-afec-4ab9-9d39-f6f2add2fd9b", FsAction.EXECUTE)); + + fs.modifyAclEntries(rootPath, aclSpec); + + acl = fs.getAclStatus(rootPath); + + count = 0; + for (AclEntry entry: acl.getEntries()) { + aclSpec.add(entry); + if (entry.getScope() == AclEntryScope.DEFAULT) { + count++; + } + } + assertEquals(5, count); + + fs.removeDefaultAcl(rootPath); + + acl = fs.getAclStatus(rootPath); + + count = 0; + for (AclEntry entry: acl.getEntries()) { + aclSpec.add(entry); + if (entry.getScope() == AclEntryScope.DEFAULT) { + count++; + } + } + assertEquals(0, count); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java index c8dcef3ef2..16a3f5703b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java @@ -45,6 +45,14 @@ public final class TestConfigurationKeys { public static final String MOCK_SASTOKENPROVIDER_FAIL_INIT = "mock.sastokenprovider.fail.init"; public static final String MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN = "mock.sastokenprovider.return.empty.sasToken"; + public static final String FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID = "fs.azure.test.app.service.principal.tenant.id"; + + public static final String FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID = "fs.azure.test.app.service.principal.object.id"; + + public static final String FS_AZURE_TEST_APP_ID = "fs.azure.test.app.id"; + + public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret"; + public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml"; public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-"; public static final int TEST_TIMEOUT = 15 * 60 * 1000; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java new file mode 100644 index 0000000000..fa50bef454 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.extensions; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; +import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.utils.Base64; +import org.apache.hadoop.fs.azurebfs.utils.DelegationSASGenerator; +import org.apache.hadoop.fs.azurebfs.utils.SASGenerator; +import org.apache.hadoop.security.AccessControlException; + +/** + * A mock SAS token provider implementation + */ +public class MockDelegationSASTokenProvider implements SASTokenProvider { + + private DelegationSASGenerator generator; + + public static final String TEST_OWNER = "325f1619-4205-432f-9fce-3fd594325ce5"; + public static final String CORRELATION_ID = "66ff4ffc-ff17-417e-a2a9-45db8c5b0b5c"; + + @Override + public void initialize(Configuration configuration, String accountName) throws IOException { + String appID = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_ID); + String appSecret = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SECRET); + String sktid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_TENANT_ID); + String skoid = configuration.get(TestConfigurationKeys.FS_AZURE_TEST_APP_SERVICE_PRINCIPAL_OBJECT_ID); + String skt = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().minusSeconds(SASGenerator.FIVE_MINUTES)); + String ske = SASGenerator.ISO_8601_FORMATTER.format(Instant.now().plusSeconds(SASGenerator.ONE_DAY)); + String skv = SASGenerator.AuthenticationVersion.Dec19.toString(); + + byte[] key = getUserDelegationKey(accountName, appID, appSecret, sktid, skt, ske, skv); + + generator = new DelegationSASGenerator(key, skoid, sktid, skt, ske, skv); + } + + // Invokes the AAD v2.0 authentication endpoint with a client credentials grant to get an + // access token. See https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-client-creds-grant-flow. + private String getAuthorizationHeader(String accountName, String appID, String appSecret, String sktid) throws IOException { + String authEndPoint = String.format("https://login.microsoftonline.com/%s/oauth2/v2.0/token", sktid); + ClientCredsTokenProvider provider = new ClientCredsTokenProvider(authEndPoint, appID, appSecret); + return "Bearer " + provider.getToken().getAccessToken(); + } + + private byte[] getUserDelegationKey(String accountName, String appID, String appSecret, + String sktid, String skt, String ske, String skv) throws IOException { + + String method = "POST"; + String account = accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)); + + final StringBuilder sb = new StringBuilder(128); + sb.append("https://"); + sb.append(account); + sb.append(".blob.core.windows.net/?restype=service&comp=userdelegationkey"); + + URL url; + try { + url = new URL(sb.toString()); + } catch (MalformedURLException ex) { + throw new InvalidUriException(sb.toString()); + } + + List requestHeaders = new ArrayList(); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, skv)); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, "application/x-www-form-urlencoded")); + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.AUTHORIZATION, getAuthorizationHeader(account, appID, appSecret, sktid))); + + final StringBuilder requestBody = new StringBuilder(512); + requestBody.append(""); + requestBody.append(skt); + requestBody.append(""); + requestBody.append(ske); + requestBody.append(""); + + AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders); + + byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); + op.sendRequest(requestBuffer, 0, requestBuffer.length); + + byte[] responseBuffer = new byte[4 * 1024]; + op.processResponse(responseBuffer, 0, responseBuffer.length); + + String responseBody = new String(responseBuffer, 0, (int) op.getBytesReceived(), StandardCharsets.UTF_8); + int beginIndex = responseBody.indexOf("") + "".length(); + int endIndex = responseBody.indexOf(""); + String value = responseBody.substring(beginIndex, endIndex); + return Base64.decode(value); + } + + /** + * Invokes the authorizer to obtain a SAS token. + * + * @param accountName the name of the storage account. + * @param fileSystem the name of the fileSystem. + * @param path the file or directory path. + * @param operation the operation to be performed on the path. + * @return a SAS token to perform the request operation. + * @throws IOException if there is a network error. + * @throws AccessControlException if access is denied. + */ + @Override + public String getSASToken(String accountName, String fileSystem, String path, + String operation) throws IOException, AccessControlException { + // The user for these tests is always TEST_OWNER. The check access operation + // requires suoid to check permissions for the user and will throw if the + // user does not have access and otherwise succeed. + String saoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? null : TEST_OWNER; + String suoid = (operation == SASTokenProvider.CHECK_ACCESS_OPERATION) ? TEST_OWNER : null; + return generator.getDelegationSAS(accountName, fileSystem, path, operation, + saoid, suoid, CORRELATION_ID); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java index de841b0b29..50ac20970f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.utils.Base64; -import org.apache.hadoop.fs.azurebfs.utils.SASGenerator; +import org.apache.hadoop.fs.azurebfs.utils.ServiceSASGenerator; /** * A mock SAS token provider implementation @@ -33,7 +33,7 @@ public class MockSASTokenProvider implements SASTokenProvider { private byte[] accountKey; - private SASGenerator generator; + private ServiceSASGenerator generator; private boolean skipAuthorizationForTestSetup = false; // For testing we use a container SAS for all operations. @@ -49,7 +49,7 @@ public void initialize(Configuration configuration, String accountName) throws I } catch (Exception ex) { throw new IOException(ex); } - generator = new SASGenerator(accountKey); + generator = new ServiceSASGenerator(accountKey); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java new file mode 100644 index 0000000000..f84ed6ab4c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +import java.time.Instant; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder; + + +/** + * Test Delegation SAS generator. + */ +public class DelegationSASGenerator extends SASGenerator { + private String skoid; + private String sktid; + private String skt; + private String ske; + private final String sks = "b"; + private String skv; + + public DelegationSASGenerator(byte[] userDelegationKey, String skoid, String sktid, String skt, String ske, String skv) { + super(userDelegationKey); + this.skoid = skoid; + this.sktid = sktid; + this.skt = skt; + this.ske = ske; + this.skv = skv; + } + + public String getDelegationSAS(String accountName, String containerName, String path, String operation, + String saoid, String suoid, String scid) { + + final String sv = AuthenticationVersion.Dec19.toString(); + final String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES)); + final String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY)); + String sr = "b"; + String sdd = null; + String sp = null; + + switch (operation) { + case SASTokenProvider.CHECK_ACCESS_OPERATION: + sp = "e"; + break; + case SASTokenProvider.WRITE_OPERATION: + case SASTokenProvider.CREATE_FILE_OPERATION: + case SASTokenProvider.CREATE_DIRECTORY_OPERATION: + sp = "w"; + break; + case SASTokenProvider.DELETE_OPERATION: + sp = "d"; + break; + case SASTokenProvider.DELETE_RECURSIVE_OPERATION: + sp = "d"; + sr = "d"; + sdd = Integer.toString(StringUtils.countMatches(path, "/")); + break; + case SASTokenProvider.GET_ACL_OPERATION: + case SASTokenProvider.GET_STATUS_OPERATION: + sp = "e"; + break; + case SASTokenProvider.LIST_OPERATION: + sp = "l"; + break; + case SASTokenProvider.READ_OPERATION: + sp = "r"; + break; + case SASTokenProvider.RENAME_DESTINATION_OPERATION: + case SASTokenProvider.RENAME_SOURCE_OPERATION: + sp = "m"; + break; + case SASTokenProvider.SET_ACL_OPERATION: + sp = "p"; + break; + case SASTokenProvider.SET_OWNER_OPERATION: + sp = "o"; + break; + case SASTokenProvider.SET_PERMISSION_OPERATION: + sp = "p"; + break; + default: + throw new IllegalArgumentException(operation); + } + + String signature = computeSignatureForSAS(sp, st, se, sv, sr, accountName, containerName, + path, saoid, suoid, scid); + + AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder(); + qb.addQuery("skoid", skoid); + qb.addQuery("sktid", sktid); + qb.addQuery("skt", skt); + qb.addQuery("ske", ske); + qb.addQuery("sks", sks); + qb.addQuery("skv", skv); + if (saoid != null) { + qb.addQuery("saoid", saoid); + } + if (suoid != null) { + qb.addQuery("suoid", suoid); + } + if (scid != null) { + qb.addQuery("scid", scid); + } + qb.addQuery("sp", sp); + qb.addQuery("st", st); + qb.addQuery("se", se); + qb.addQuery("sv", sv); + qb.addQuery("sr", sr); + if (sdd != null) { + qb.addQuery("sdd", sdd); + } + qb.addQuery("sig", signature); + return qb.toString().substring(1); + } + + private String computeSignatureForSAS(String sp, String st, String se, String sv, + String sr, String accountName, String containerName, + String path, String saoid, String suoid, String scid) { + + StringBuilder sb = new StringBuilder(); + sb.append(sp); + sb.append("\n"); + sb.append(st); + sb.append("\n"); + sb.append(se); + sb.append("\n"); + // canonicalized resource + sb.append("/blob/"); + sb.append(accountName); + sb.append("/"); + sb.append(containerName); + if (path != null && sr != "c") { + sb.append(path); + } + sb.append("\n"); + sb.append(skoid); + sb.append("\n"); + sb.append(sktid); + sb.append("\n"); + sb.append(skt); + sb.append("\n"); + sb.append(ske); + sb.append("\n"); + sb.append(sks); + sb.append("\n"); + sb.append(skv); + sb.append("\n"); + if (saoid != null) { + sb.append(saoid); + } + sb.append("\n"); + if (suoid != null) { + sb.append(suoid); + } + sb.append("\n"); + if (scid != null) { + sb.append(scid); + } + sb.append("\n"); + + sb.append("\n"); // sip + sb.append("\n"); // spr + sb.append(sv); + sb.append("\n"); + sb.append(sr); + sb.append("\n"); + sb.append("\n"); // - For optional : rscc - ResponseCacheControl + sb.append("\n"); // - For optional : rscd - ResponseContentDisposition + sb.append("\n"); // - For optional : rsce - ResponseContentEncoding + sb.append("\n"); // - For optional : rscl - ResponseContentLanguage + sb.append("\n"); // - For optional : rsct - ResponseContentType + + String stringToSign = sb.toString(); + LOG.debug("Delegation SAS stringToSign: " + stringToSign.replace("\n", ".")); + return computeHmac256(stringToSign); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java index 19bf9e2c45..34d504a9a0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java @@ -19,104 +19,74 @@ package org.apache.hadoop.fs.azurebfs.utils; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.time.format.DateTimeFormatter; -import java.time.Instant; import java.time.ZoneId; import java.util.Locale; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Test container SAS generator. + * Test SAS generator. */ -public class SASGenerator { +public abstract class SASGenerator { - private static final String HMAC_SHA256 = "HmacSHA256"; - private static final int TOKEN_START_PERIOD_IN_SECONDS = 5 * 60; - private static final int TOKEN_EXPIRY_PERIOD_IN_SECONDS = 24 * 60 * 60; - public static final DateTimeFormatter ISO_8601_UTC_DATE_FORMATTER = + public enum AuthenticationVersion { + Nov18("2018-11-09"), + Dec19("2019-12-12"); + + private final String ver; + + AuthenticationVersion(String version) { + this.ver = version; + } + + @Override + public String toString() { + return ver; + } + } + + protected static final Logger LOG = LoggerFactory.getLogger(SASGenerator.class); + public static final int FIVE_MINUTES = 5 * 60; + public static final int ONE_DAY = 24 * 60 * 60; + public static final DateTimeFormatter ISO_8601_FORMATTER = DateTimeFormatter .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT) .withZone(ZoneId.of("UTC")); + private Mac hmacSha256; private byte[] key; - public SASGenerator(byte[] key) { + // hide default constructor + private SASGenerator() { + } + + /** + * Called by subclasses to initialize the cryptographic SHA-256 HMAC provider. + * @param key - a 256-bit secret key + */ + protected SASGenerator(byte[] key) { this.key = key; initializeMac(); } - public String getContainerSASWithFullControl(String accountName, String containerName) { - String sp = "rcwdl"; - String sv = "2018-11-09"; - String sr = "c"; - String st = ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().minusSeconds(TOKEN_START_PERIOD_IN_SECONDS)); - String se = - ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().plusSeconds(TOKEN_EXPIRY_PERIOD_IN_SECONDS)); - - String signature = computeSignatureForSAS(sp, st, se, sv, "c", - accountName, containerName); - - AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder(); - qb.addQuery("sp", sp); - qb.addQuery("st", st); - qb.addQuery("se", se); - qb.addQuery("sv", sv); - qb.addQuery("sr", sr); - qb.addQuery("sig", signature); - return qb.toString().substring(1); - } - - private String computeSignatureForSAS(String sp, String st, - String se, String sv, String sr, String accountName, String containerName) { - - StringBuilder sb = new StringBuilder(); - sb.append(sp); - sb.append("\n"); - sb.append(st); - sb.append("\n"); - sb.append(se); - sb.append("\n"); - // canonicalized resource - sb.append("/blob/"); - sb.append(accountName); - sb.append("/"); - sb.append(containerName); - sb.append("\n"); - sb.append("\n"); // si - sb.append("\n"); // sip - sb.append("\n"); // spr - sb.append(sv); - sb.append("\n"); - sb.append(sr); - sb.append("\n"); - sb.append("\n"); // - For optional : rscc - ResponseCacheControl - sb.append("\n"); // - For optional : rscd - ResponseContentDisposition - sb.append("\n"); // - For optional : rsce - ResponseContentEncoding - sb.append("\n"); // - For optional : rscl - ResponseContentLanguage - sb.append("\n"); // - For optional : rsct - ResponseContentType - - String stringToSign = sb.toString(); - return computeHmac256(stringToSign); - } - private void initializeMac() { // Initializes the HMAC-SHA256 Mac and SecretKey. try { - hmacSha256 = Mac.getInstance(HMAC_SHA256); - hmacSha256.init(new SecretKeySpec(key, HMAC_SHA256)); + hmacSha256 = Mac.getInstance("HmacSHA256"); + hmacSha256.init(new SecretKeySpec(key, "HmacSHA256")); } catch (final Exception e) { throw new IllegalArgumentException(e); } } - private String computeHmac256(final String stringToSign) { + protected String computeHmac256(final String stringToSign) { byte[] utf8Bytes; try { - utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8); + utf8Bytes = stringToSign.getBytes(StandardCharsets.UTF_8.toString()); } catch (final UnsupportedEncodingException e) { throw new IllegalArgumentException(e); } @@ -126,4 +96,4 @@ private String computeHmac256(final String stringToSign) { } return Base64.encode(hmac); } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java new file mode 100644 index 0000000000..a76681b599 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +import java.time.Instant; + +import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder; + +/** + * Test Service SAS generator. + */ +public class ServiceSASGenerator extends SASGenerator { + + /** + * Creates a SAS Generator for Service SAS + * (https://docs.microsoft.com/en-us/rest/api/storageservices/create-service-sas). + * @param accountKey - the storage account key + */ + public ServiceSASGenerator(byte[] accountKey) { + super(accountKey); + } + + public String getContainerSASWithFullControl(String accountName, String containerName) { + String sp = "rcwdl"; + String sv = AuthenticationVersion.Nov18.toString(); + String sr = "c"; + String st = ISO_8601_FORMATTER.format(Instant.now().minusSeconds(FIVE_MINUTES)); + String se = ISO_8601_FORMATTER.format(Instant.now().plusSeconds(ONE_DAY)); + + String signature = computeSignatureForSAS(sp, st, se, sv, "c", + accountName, containerName, null); + + AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder(); + qb.addQuery("sp", sp); + qb.addQuery("st", st); + qb.addQuery("se", se); + qb.addQuery("sv", sv); + qb.addQuery("sr", sr); + qb.addQuery("sig", signature); + return qb.toString().substring(1); + } + + private String computeSignatureForSAS(String sp, String st, String se, String sv, + String sr, String accountName, String containerName, String path) { + + StringBuilder sb = new StringBuilder(); + sb.append(sp); + sb.append("\n"); + sb.append(st); + sb.append("\n"); + sb.append(se); + sb.append("\n"); + // canonicalized resource + sb.append("/blob/"); + sb.append(accountName); + sb.append("/"); + sb.append(containerName); + if (path != null && sr != "c") { + //sb.append("/"); + sb.append(path); + } + sb.append("\n"); + sb.append("\n"); // si + sb.append("\n"); // sip + sb.append("\n"); // spr + sb.append(sv); + sb.append("\n"); + sb.append(sr); + sb.append("\n"); + sb.append("\n"); // - For optional : rscc - ResponseCacheControl + sb.append("\n"); // - For optional : rscd - ResponseContentDisposition + sb.append("\n"); // - For optional : rsce - ResponseContentEncoding + sb.append("\n"); // - For optional : rscl - ResponseContentLanguage + sb.append("\n"); // - For optional : rsct - ResponseContentType + + String stringToSign = sb.toString(); + LOG.debug("Service SAS stringToSign: " + stringToSign.replace("\n", ".")); + return computeHmac256(stringToSign); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java new file mode 100644 index 0000000000..1016d4bbbb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.utils; + +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS; +import static java.time.temporal.ChronoUnit.SECONDS; + +/** + * Test CachedSASToken. + */ +public final class TestCachedSASToken { + + @Test + public void testUpdateAndGet() throws IOException { + CachedSASToken cachedSasToken = new CachedSASToken(); + + String se1 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + String token1 = "se=" + se1; + + // set first time and ensure reference equality + cachedSasToken.update(token1); + String cachedToken = cachedSasToken.get(); + Assert.assertTrue(token1 == cachedToken); + + // update with same token and ensure reference equality + cachedSasToken.update(token1); + cachedToken = cachedSasToken.get(); + Assert.assertTrue(token1 == cachedToken); + + // renew and ensure reference equality + String se2 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + String token2 = "se=" + se2; + cachedSasToken.update(token2); + cachedToken = cachedSasToken.get(); + Assert.assertTrue(token2 == cachedToken); + + // renew and ensure reference equality with ske + String se3 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 4, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + + String ske3 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + String token3 = "se=" + se3 + "&ske=" + ske3; + cachedSasToken.update(token3); + cachedToken = cachedSasToken.get(); + Assert.assertTrue(token3 == cachedToken); + } + + @Test + public void testGetExpiration() throws IOException { + CachedSASToken cachedSasToken = new CachedSASToken(); + + String se = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + OffsetDateTime seDate = OffsetDateTime.parse(se, DateTimeFormatter.ISO_DATE_TIME); + String token = "se=" + se; + + // By-pass the normal validation provided by update method + // by callng set with expired SAS, then ensure the get + // method returns null (auto expiration as next REST operation will use + // SASTokenProvider to get a new SAS). + cachedSasToken.setForTesting(token, seDate); + String cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + } + + @Test + public void testUpdateAndGetWithExpiredToken() throws IOException { + CachedSASToken cachedSasToken = new CachedSASToken(); + + String se1 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + String token1 = "se=" + se1; + + // set expired token and ensure not cached + cachedSasToken.update(token1); + String cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + String se2 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS * 2, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + + String ske2 = OffsetDateTime.now(ZoneOffset.UTC).plus( + DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS - 1, + SECONDS).format(DateTimeFormatter.ISO_DATE_TIME); + String token2 = "se=" + se2 + "&ske=" + ske2; + + // set with expired ske and ensure not cached + cachedSasToken.update(token2); + cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + } + + @Test + public void testUpdateAndGetWithInvalidToken() throws IOException { + CachedSASToken cachedSasToken = new CachedSASToken(); + + // set and ensure reference that it is not cached + String token1 = "se="; + cachedSasToken.update(token1); + String cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + // set and ensure reference that it is not cached + String token2 = "se=xyz"; + cachedSasToken.update(token2); + cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + // set and ensure reference that it is not cached + String token3 = "se=2100-01-01T00:00:00Z&ske="; + cachedSasToken.update(token3); + cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + // set and ensure reference that it is not cached + String token4 = "se=2100-01-01T00:00:00Z&ske=xyz&"; + cachedSasToken.update(token4); + cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + + // set and ensure reference that it is not cached + String token5 = "se=abc&ske=xyz&"; + cachedSasToken.update(token5); + cachedToken = cachedSasToken.get(); + Assert.assertNull(cachedToken); + } +} \ No newline at end of file