HADOOP-15692. ABFS: extensible support for custom oauth.
Contributed by Junhua Gu and Rajeev Bansal.
This commit is contained in:
parent
dd2b22fa31
commit
df57c6c3b1
@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
|
||||
@ -43,13 +42,14 @@ import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValida
|
||||
import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
|
||||
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
|
||||
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdaptee;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
||||
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
|
||||
@ -57,7 +57,7 @@ import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
|
||||
|
||||
/**
|
||||
* Configuration for Azure Blob FileSystem.
|
||||
@ -69,83 +69,86 @@ public class AbfsConfiguration{
|
||||
private final boolean isSecure;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
||||
MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
|
||||
MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE)
|
||||
MinValue = MIN_BUFFER_SIZE,
|
||||
MaxValue = MAX_BUFFER_SIZE,
|
||||
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
|
||||
private int writeBufferSize;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
|
||||
MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE,
|
||||
MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE)
|
||||
MinValue = MIN_BUFFER_SIZE,
|
||||
MaxValue = MAX_BUFFER_SIZE,
|
||||
DefaultValue = DEFAULT_READ_BUFFER_SIZE)
|
||||
private int readBufferSize;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL)
|
||||
DefaultValue = DEFAULT_MIN_BACKOFF_INTERVAL)
|
||||
private int minBackoffInterval;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_BACKOFF_INTERVAL,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL)
|
||||
DefaultValue = DEFAULT_MAX_BACKOFF_INTERVAL)
|
||||
private int maxBackoffInterval;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL)
|
||||
DefaultValue = DEFAULT_BACKOFF_INTERVAL)
|
||||
private int backoffInterval;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_RETRIES,
|
||||
MinValue = 0,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS)
|
||||
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
|
||||
private int maxIoRetries;
|
||||
|
||||
@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
|
||||
MinValue = 0,
|
||||
MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE,
|
||||
DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE)
|
||||
MaxValue = MAX_AZURE_BLOCK_SIZE,
|
||||
DefaultValue = MAX_AZURE_BLOCK_SIZE)
|
||||
private long azureBlockSize;
|
||||
|
||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME,
|
||||
DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT)
|
||||
DefaultValue = AZURE_BLOCK_LOCATION_HOST_DEFAULT)
|
||||
private String azureBlockLocationHost;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT,
|
||||
MinValue = 1,
|
||||
DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS)
|
||||
DefaultValue = MAX_CONCURRENT_WRITE_THREADS)
|
||||
private int maxConcurrentWriteThreads;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN,
|
||||
MinValue = 1,
|
||||
DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS)
|
||||
DefaultValue = MAX_CONCURRENT_READ_THREADS)
|
||||
private int maxConcurrentReadThreads;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_TOLERATE_CONCURRENT_APPEND,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
|
||||
DefaultValue = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND)
|
||||
private boolean tolerateOobAppends;
|
||||
|
||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ATOMIC_RENAME_KEY,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
|
||||
DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES)
|
||||
private String azureAtomicDirs;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
|
||||
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
|
||||
private boolean createRemoteFileSystemDuringInitialization;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION)
|
||||
DefaultValue = DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION)
|
||||
private boolean skipUserGroupMetadataDuringInitialization;
|
||||
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_QUEUE_DEPTH,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH)
|
||||
DefaultValue = DEFAULT_READ_AHEAD_QUEUE_DEPTH)
|
||||
private int readAheadQueueDepth;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH,
|
||||
DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH)
|
||||
DefaultValue = DEFAULT_ENABLE_FLUSH)
|
||||
private boolean enableFlush;
|
||||
|
||||
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
|
||||
DefaultValue = "")
|
||||
DefaultValue = "")
|
||||
private String userAgentId;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_DELEGATION_TOKEN,
|
||||
DefaultValue = DEFAULT_ENABLE_DELEGATION_TOKEN)
|
||||
private boolean enableDelegationToken;
|
||||
|
||||
private Map<String, String> storageAccountKeys;
|
||||
|
||||
public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
@ -292,6 +295,14 @@ public class AbfsConfiguration{
|
||||
return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
|
||||
}
|
||||
|
||||
public boolean isDelegationTokenManagerEnabled() {
|
||||
return enableDelegationToken;
|
||||
}
|
||||
|
||||
public AbfsDelegationTokenManager getDelegationTokenManager() throws IOException {
|
||||
return new AbfsDelegationTokenManager(configuration);
|
||||
}
|
||||
|
||||
public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException {
|
||||
AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
|
||||
if (authType == AuthType.OAuth) {
|
||||
|
@ -61,9 +61,11 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnh
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
|
||||
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
@ -82,6 +84,9 @@ public class AzureBlobFileSystem extends FileSystem {
|
||||
private AzureBlobFileSystemStore abfsStore;
|
||||
private boolean isClosed;
|
||||
|
||||
private boolean delegationTokenEnabled = false;
|
||||
private AbfsDelegationTokenManager delegationTokenManager;
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration configuration)
|
||||
throws IOException {
|
||||
@ -112,6 +117,15 @@ public class AzureBlobFileSystem extends FileSystem {
|
||||
//Provide a default group name
|
||||
this.primaryUserGroup = this.user;
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
this.delegationTokenEnabled = abfsStore.getAbfsConfiguration().isDelegationTokenManagerEnabled();
|
||||
|
||||
if(this.delegationTokenEnabled) {
|
||||
LOG.debug("Initializing DelegationTokenManager for {}", uri);
|
||||
this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -815,6 +829,20 @@ public class AzureBlobFileSystem extends FileSystem {
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a delegation token from remote service endpoint if
|
||||
* 'fs.azure.enable.kerberos.support' is set to 'true', and
|
||||
* 'fs.azure.enable.delegation.token' is set to 'true'.
|
||||
* @param renewer the account name that is allowed to renew the token.
|
||||
* @return delegation token
|
||||
* @throws IOException thrown when getting the current user.
|
||||
*/
|
||||
@Override
|
||||
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
|
||||
return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer)
|
||||
: super.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
FileSystem.Statistics getFsStatistics() {
|
||||
return this.statistics;
|
||||
|
@ -79,5 +79,8 @@ public final class ConfigurationKeys {
|
||||
/** Prefix for oauth refresh token: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token.";
|
||||
|
||||
public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = "fs.azure.enable.delegation.token";
|
||||
public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = "fs.azure.delegation.token.provider.type";
|
||||
|
||||
private ConfigurationKeys() {}
|
||||
}
|
||||
|
@ -61,5 +61,6 @@ public final class FileSystemConfigurations {
|
||||
public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
|
||||
= SSLSocketFactoryEx.SSLChannelMode.Default;
|
||||
|
||||
public static final boolean DEFAULT_ENABLE_DELEGATION_TOKEN = false;
|
||||
private FileSystemConfigurations() {}
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.extensions;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
/**
|
||||
* Interface for Managing the Delegation tokens.
|
||||
*/
|
||||
public interface CustomDelegationTokenManager {
|
||||
|
||||
/**
|
||||
* Initialize with supported configuration. This method is invoked when the
|
||||
* (URI, Configuration)} method is invoked.
|
||||
*
|
||||
* @param configuration Configuration object
|
||||
* @throws IOException if instance can not be configured.
|
||||
*/
|
||||
void initialize(Configuration configuration)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Get Delegation token.
|
||||
* @param renewer delegation token renewer
|
||||
* @return delegation token
|
||||
* @throws IOException when error in getting the delegation token
|
||||
*/
|
||||
Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Renew the delegation token.
|
||||
* @param token delegation token.
|
||||
* @return renewed time.
|
||||
* @throws IOException when error in renewing the delegation token
|
||||
*/
|
||||
long renewDelegationToken(Token<?> token) throws IOException;
|
||||
|
||||
/**
|
||||
* Cancel the delegation token.
|
||||
* @param token delegation token.
|
||||
* @throws IOException when error in cancelling the delegation token.
|
||||
*/
|
||||
void cancelDelegationToken(Token<?> token) throws IOException;
|
||||
}
|
@ -16,7 +16,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.oauth2;
|
||||
package org.apache.hadoop.fs.azurebfs.extensions;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
@ -47,8 +47,8 @@ public interface CustomTokenProviderAdaptee {
|
||||
* @param accountName Account Name
|
||||
* @throws IOException if instance can not be configured.
|
||||
*/
|
||||
void initialize(Configuration configuration, String accountName)
|
||||
throws IOException;
|
||||
void initialize(Configuration configuration, final String accountName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Obtain the access token that should be added to https connection's header.
|
||||
@ -72,4 +72,4 @@ public interface CustomTokenProviderAdaptee {
|
||||
* @return Date to expire access token retrieved from AAD.
|
||||
*/
|
||||
Date getExpiryTime();
|
||||
}
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Public
|
||||
package org.apache.hadoop.fs.azurebfs.extensions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
|
||||
|
||||
/**
|
||||
* Provides tokens based on custom implementation, following the Adapter Design
|
||||
|
@ -0,0 +1,49 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.security;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
|
||||
/**
|
||||
* Delegation token Identifier for ABFS delegation tokens.
|
||||
*/
|
||||
public class AbfsDelegationTokenIdentifier extends DelegationTokenIdentifier {
|
||||
public static final Text TOKEN_KIND = new Text("ABFS delegation");
|
||||
|
||||
public AbfsDelegationTokenIdentifier(){
|
||||
super(TOKEN_KIND);
|
||||
}
|
||||
|
||||
public AbfsDelegationTokenIdentifier(Text kind) {
|
||||
super(kind);
|
||||
}
|
||||
|
||||
public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
|
||||
Text realUser) {
|
||||
super(kind, owner, renewer, realUser);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return TOKEN_KIND;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,88 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.security;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.extensions.CustomDelegationTokenManager;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Class for delegation token Manager.
|
||||
*/
|
||||
public class AbfsDelegationTokenManager {
|
||||
|
||||
private CustomDelegationTokenManager tokenManager;
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbfsDelegationTokenManager.class);
|
||||
|
||||
public AbfsDelegationTokenManager(final Configuration conf) throws IOException {
|
||||
|
||||
Preconditions.checkNotNull(conf, "conf");
|
||||
|
||||
Class<? extends CustomDelegationTokenManager> customDelegationTokenMgrClass =
|
||||
conf.getClass(ConfigurationKeys.FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE, null,
|
||||
CustomDelegationTokenManager.class);
|
||||
|
||||
if (customDelegationTokenMgrClass == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"The value for \"fs.azure.delegation.token.provider.type\" is not defined.");
|
||||
}
|
||||
|
||||
CustomDelegationTokenManager customTokenMgr = (CustomDelegationTokenManager) ReflectionUtils
|
||||
.newInstance(customDelegationTokenMgrClass, conf);
|
||||
if (customTokenMgr == null) {
|
||||
throw new IllegalArgumentException(String.format("Failed to initialize %s.", customDelegationTokenMgrClass));
|
||||
}
|
||||
|
||||
customTokenMgr.initialize(conf);
|
||||
|
||||
tokenManager = customTokenMgr;
|
||||
}
|
||||
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(
|
||||
String renewer) throws IOException {
|
||||
|
||||
Token<DelegationTokenIdentifier> token = tokenManager.getDelegationToken(renewer);
|
||||
|
||||
token.setKind(AbfsDelegationTokenIdentifier.TOKEN_KIND);
|
||||
return token;
|
||||
}
|
||||
|
||||
public long renewDelegationToken(Token<?> token)
|
||||
throws IOException {
|
||||
|
||||
return tokenManager.renewDelegationToken(token);
|
||||
}
|
||||
|
||||
public void cancelDelegationToken(Token<?> token)
|
||||
throws IOException {
|
||||
|
||||
tokenManager.cancelDelegationToken(token);
|
||||
}
|
||||
}
|
@ -0,0 +1,96 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.security;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
|
||||
/**
|
||||
* Token Renewer for renewing ABFS delegation tokens with remote service.
|
||||
*/
|
||||
public class AbfsTokenRenewer extends TokenRenewer {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbfsTokenRenewer.class);
|
||||
|
||||
/**
|
||||
* Checks if this particular object handles the Kind of token passed.
|
||||
*
|
||||
* @param kind the kind of the token
|
||||
* @return true if it handles passed token kind false otherwise.
|
||||
*/
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return AbfsDelegationTokenIdentifier.TOKEN_KIND.equals(kind);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if passed token is managed.
|
||||
*
|
||||
* @param token the token being checked
|
||||
* @return true if it is managed.
|
||||
* @throws IOException thrown when evaluating if token is managed.
|
||||
*/
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew the delegation token.
|
||||
*
|
||||
* @param token token to renew.
|
||||
* @param conf configuration object.
|
||||
* @return extended expiry time of the token.
|
||||
* @throws IOException thrown when trying get current user.
|
||||
* @throws InterruptedException thrown when thread is interrupted
|
||||
*/
|
||||
@Override
|
||||
public long renew(final Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
LOG.debug("Renewing the delegation token");
|
||||
return getInstance(conf).renewDelegationToken(token);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel the delegation token.
|
||||
*
|
||||
* @param token token to cancel.
|
||||
* @param conf configuration object.
|
||||
* @throws IOException thrown when trying get current user.
|
||||
* @throws InterruptedException thrown when thread is interrupted.
|
||||
*/
|
||||
@Override
|
||||
public void cancel(final Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
LOG.debug("Cancelling the delegation token");
|
||||
getInstance(conf).cancelDelegationToken(token);
|
||||
}
|
||||
|
||||
private AbfsDelegationTokenManager getInstance(Configuration conf)
|
||||
throws IOException {
|
||||
return new AbfsDelegationTokenManager(conf);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
package org.apache.hadoop.fs.azurebfs.security;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
@ -13,4 +13,5 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenIdentifier
|
||||
org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier
|
@ -13,4 +13,5 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.hadoop.fs.azurebfs.security.AbfsTokenRenewer
|
||||
org.apache.hadoop.fs.azure.security.WasbTokenRenewer
|
@ -56,6 +56,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||
@Test
|
||||
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
touch(TEST_FILE);
|
||||
validateStatus(fs, TEST_FILE, false);
|
||||
}
|
||||
@ -63,7 +64,6 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||
private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, final boolean isDir)
|
||||
throws IOException {
|
||||
FileStatus fileStatus = fs.getFileStatus(name);
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
|
||||
String errorInStatus = "error in " + fileStatus + " from " + fs;
|
||||
|
||||
@ -89,6 +89,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
||||
@Test
|
||||
public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
fs.getConf().set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, DEFAULT_UMASK_VALUE);
|
||||
fs.mkdirs(TEST_FOLDER);
|
||||
|
||||
validateStatus(fs, TEST_FOLDER, true);
|
||||
|
Loading…
x
Reference in New Issue
Block a user