HADOOP-13945. Azure: Add Kerberos and Delegation token support to WASB client. Contributed by Santhosh G Nayak

Change-Id: I39fb0a3b0491bf2160571366939a0502a0045429
This commit is contained in:
Mingliang Liu 2017-03-20 16:28:15 -07:00
parent 49efd5d204
commit 8e15e24059
14 changed files with 714 additions and 107 deletions

View File

@ -478,7 +478,7 @@ public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentati
this.storageInteractionLayer = new StorageInterfaceImpl();
} else {
this.storageInteractionLayer = new SecureStorageInterfaceImpl(
useLocalSasKeyMode, conf, delegationToken);
useLocalSasKeyMode, conf);
}
}

View File

@ -27,7 +27,9 @@
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@ -61,10 +63,16 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@ -1107,6 +1115,9 @@ private void restoreKey() throws IOException {
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
private boolean appendSupportEnabled = false;
private DelegationTokenAuthenticatedURL authURL;
private DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
private String credServiceUrl;
/**
* Configuration key to enable authorization support in WASB.
@ -1124,6 +1135,11 @@ private void restoreKey() throws IOException {
*/
private boolean azureAuthorization = false;
/**
* Flag controlling Kerberos support in WASB.
*/
private boolean kerberosSupportEnabled = false;
/**
* Authorizer to use when authorization support is enabled in
* WASB.
@ -1278,6 +1294,12 @@ public void initialize(URI uri, Configuration conf)
new RemoteWasbAuthorizerImpl();
authorizer.init(conf);
}
if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
authURL = new DelegationTokenAuthenticatedURL(authenticator);
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
}
}
@Override
@ -1406,7 +1428,7 @@ private void performAuthCheck(String path, String accessType,
String operation) throws WasbAuthorizationException, IOException {
if (azureAuthorization && this.authorizer != null &&
!this.authorizer.authorize(path, accessType, delegationToken)) {
!this.authorizer.authorize(path, accessType)) {
throw new WasbAuthorizationException(operation
+ " operation for Path : " + path + " not allowed");
}
@ -2899,6 +2921,49 @@ public synchronized void close() throws IOException {
isClosed = true;
}
/**
* Get a delegation token from remote service endpoint if
* 'fs.azure.enable.kerberos.support' is set to 'true'.
* @param renewer the account name that is allowed to renew the token.
* @return delegation token
* @throws IOException thrown when getting the current user.
*/
@Override
public Token<?> getDelegationToken(final String renewer) throws IOException {
if (kerberosSupportEnabled) {
try {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
}
if (!connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
connectUgi.checkTGTAndReloginFromKeytab();
return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
return authURL.getDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
}
});
} catch (Exception ex) {
LOG.error("Error in fetching the delegation token from remote service",
ex);
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw new IOException(ex);
}
}
} else {
return super.getDelegationToken(renewer);
}
}
/**
* A handler that defines what to do with blobs whose upload was
* interrupted.

View File

@ -21,10 +21,22 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,12 +55,6 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
public static final Logger LOG =
LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
/**
* Configuration parameter name expected in the Configuration
* object to provide the url of the remote service {@value}
*/
private static final String KEY_CRED_SERVICE_URL =
"fs.azure.cred.service.url";
/**
* Container SAS Key generation OP name. {@value}
@ -82,7 +88,7 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation_token";
"delegation";
/**
* Query parameter name for the relative path inside the storage
@ -94,41 +100,50 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
private String delegationToken = "";
private String credServiceUrl = "";
private WasbRemoteCallHelper remoteCallHelper = null;
private boolean isSecurityEnabled;
private boolean isKerberosSupportEnabled;
public RemoteSASKeyGeneratorImpl(Configuration conf) {
super(conf);
}
public boolean initialize(Configuration conf, String delegationToken) {
public void initialize(Configuration conf) throws IOException {
LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
credServiceUrl = conf.get(KEY_CRED_SERVICE_URL);
if (delegationToken == null || delegationToken.isEmpty()) {
LOG.error("Delegation Token not provided for initialization"
+ " of RemoteSASKeyGenerator");
return false;
try {
delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
} catch (IOException e) {
final String msg = "Error in fetching the WASB delegation token";
LOG.error(msg, e);
throw new IOException(msg, e);
}
this.delegationToken = delegationToken;
try {
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
} catch (UnknownHostException e) {
final String msg = "Invalid CredService Url, configure it correctly";
LOG.error(msg, e);
throw new IOException(msg, e);
}
if (credServiceUrl == null || credServiceUrl.isEmpty()) {
LOG.error("CredService Url not found in configuration to initialize"
+ " RemoteSASKeyGenerator");
return false;
final String msg = "CredService Url not found in configuration to "
+ "initialize RemoteSASKeyGenerator";
LOG.error(msg);
throw new IOException(msg);
}
remoteCallHelper = new WasbRemoteCallHelper();
LOG.debug("Initialization of RemoteSASKeyGenerator instance successfull");
return true;
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
this.isKerberosSupportEnabled = conf.getBoolean(
Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
}
@Override
public URI getContainerSASUri(String storageAccount, String container)
throws SASKeyGenerationException {
try {
LOG.debug("Generating Container SAS Key for Container {} "
+ "inside Storage Account {} ", container, storageAccount);
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
@ -139,38 +154,39 @@ public URI getContainerSASUri(String storageAccount, String container)
container);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
RemoteSASKeyGenerationResponse sasKeyResponse =
makeRemoteRequest(uriBuilder.build());
if (sasKeyResponse == null) {
throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ " object null from remote call");
} else if (sasKeyResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException("Remote Service encountered error"
+ " in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
if (isSecurityEnabled && !connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
return getSASKey(uriBuilder.build(), connectUgi);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "while building the HttpGetRequest to remote cred service",
uriSyntaxEx);
} catch (IOException e) {
throw new SASKeyGenerationException("Encountered IOException"
+ " while building the HttpGetRequest to remote service", e);
}
}
@Override
public URI getRelativeBlobSASUri(String storageAccount, String container,
String relativePath) throws SASKeyGenerationException {
try {
LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
+ " Container {} inside Storage Account {} ",
+ " Container {} inside Storage Account {} ",
relativePath, container, storageAccount);
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
uriBuilder.setPath("/" + BLOB_SAS_OP);
@ -182,41 +198,98 @@ public URI getRelativeBlobSASUri(String storageAccount, String container,
relativePath);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
RemoteSASKeyGenerationResponse sasKeyResponse =
makeRemoteRequest(uriBuilder.build());
if (sasKeyResponse == null) {
throw new SASKeyGenerationException("RemoteSASKeyGenerationResponse"
+ " object null from remote call");
} else if (sasKeyResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException("Remote Service encountered error"
+ " in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
if (isSecurityEnabled && StringUtils.isNotEmpty(
delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
if (isSecurityEnabled && !connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
return getSASKey(uriBuilder.build(), connectUgi);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException"
+ " while building the HttpGetRequest to " + " remote service",
uriSyntaxEx);
} catch (IOException e) {
throw new SASKeyGenerationException("Encountered IOException"
+ " while building the HttpGetRequest to remote service", e);
}
}
private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
throws URISyntaxException, SASKeyGenerationException {
final RemoteSASKeyGenerationResponse sasKeyResponse;
try {
connectUgi.checkTGTAndReloginFromKeytab();
sasKeyResponse = connectUgi.doAs(
new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
@Override
public RemoteSASKeyGenerationResponse run() throws Exception {
AuthenticatedURL.Token token = null;
if (isKerberosSupportEnabled && UserGroupInformation
.isSecurityEnabled() && (delegationToken == null
|| delegationToken.isEmpty())) {
token = new AuthenticatedURL.Token();
final Authenticator kerberosAuthenticator =
new KerberosDelegationTokenAuthenticator();
try {
kerberosAuthenticator.authenticate(uri.toURL(), token);
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. "
+ "The request cannot proceed.");
} catch (AuthenticationException e) {
throw new IOException(
"Authentication failed in check authorization", e);
}
}
return makeRemoteRequest(uri,
(token != null ? token.toString() : null));
}
});
} catch (InterruptedException | IOException e) {
final String msg = "Error fetching SAS Key from Remote Service: " + uri;
LOG.error(msg, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new SASKeyGenerationException(msg, e);
}
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException(
"Remote Service encountered error in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
}
}
/**
* Helper method to make a remote request.
* @param uri - Uri to use for the remote request
* @param token - hadoop.auth token for the remote request
* @return RemoteSASKeyGenerationResponse
*/
private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri)
throws SASKeyGenerationException {
private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
String token) throws SASKeyGenerationException {
try {
String responseBody =
remoteCallHelper.makeRemoteGetRequest(new HttpGet(uri));
HttpGet httpGet = new HttpGet(uri);
if (token != null) {
httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
String responseBody = remoteCallHelper.makeRemoteGetRequest(httpGet);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(responseBody,

View File

@ -22,12 +22,27 @@
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
@ -39,7 +54,10 @@
*/
public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
private String remoteAuthorizerServiceUrl = "";
public static final Logger LOG = LoggerFactory
.getLogger(RemoteWasbAuthorizerImpl.class);
private String remoteAuthorizerServiceUrl = null;
/**
* Configuration parameter name expected in the Configuration object to
@ -70,9 +88,12 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation_token";
"delegation";
private WasbRemoteCallHelper remoteCallHelper = null;
private String delegationToken;
private boolean isSecurityEnabled;
private boolean isKerberosSupportEnabled;
@VisibleForTesting
public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
@ -82,55 +103,113 @@ public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
@Override
public void init(Configuration conf)
throws WasbAuthorizationException, IOException {
LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
Iterator<Token<? extends TokenIdentifier>> tokenIterator = null;
try {
delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
} catch (IOException e) {
final String msg = "Error in fetching the WASB delegation token";
LOG.error(msg, e);
throw new IOException(msg, e);
}
remoteAuthorizerServiceUrl = conf.get(KEY_REMOTE_AUTH_SERVICE_URL);
remoteAuthorizerServiceUrl = SecurityUtils
.getRemoteAuthServiceUrls(conf);
if (remoteAuthorizerServiceUrl == null
|| remoteAuthorizerServiceUrl.isEmpty()) {
throw new WasbAuthorizationException(
"fs.azure.authorization.remote.service.url config not set"
+ " in configuration.");
+ " in configuration.");
}
this.remoteCallHelper = new WasbRemoteCallHelper();
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
this.isKerberosSupportEnabled = conf
.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
}
@Override
public boolean authorize(String wasbAbsolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException, IOException {
public boolean authorize(String wasbAbsolutePath, String accessType)
throws WasbAuthorizationException, IOException {
try {
URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
wasbAbsolutePath);
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
accessType);
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
delegationToken);
}
try {
URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
wasbAbsolutePath);
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
accessType);
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
delegationToken);
String responseBody = null;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
if (isSecurityEnabled && !connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
connectUgi.checkTGTAndReloginFromKeytab();
String responseBody = remoteCallHelper.makeRemoteGetRequest(
new HttpGet(uriBuilder.build()));
try {
responseBody = connectUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
AuthenticatedURL.Token token = null;
HttpGet httpGet = new HttpGet(uriBuilder.build());
if (isKerberosSupportEnabled && UserGroupInformation
.isSecurityEnabled() && (delegationToken == null
|| delegationToken.isEmpty())) {
token = new AuthenticatedURL.Token();
final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
try {
kerberosAuthenticator
.authenticate(uriBuilder.build().toURL(), token);
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. The request cannot proceed.");
} catch (AuthenticationException e){
throw new IOException("Authentication failed in check authorization", e);
}
if (token != null) {
httpGet.setHeader("Cookie",
AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
}
return remoteCallHelper.makeRemoteGetRequest(httpGet);
}
});
} catch (InterruptedException e) {
LOG.error("Error in check authorization", e);
throw new WasbAuthorizationException("Error in check authorize", e);
}
ObjectMapper objectMapper = new ObjectMapper();
RemoteAuthorizerResponse authorizerResponse =
objectMapper.readValue(responseBody, RemoteAuthorizerResponse.class);
ObjectMapper objectMapper = new ObjectMapper();
RemoteAuthorizerResponse authorizerResponse =
objectMapper
.readValue(responseBody, RemoteAuthorizerResponse.class);
if (authorizerResponse == null) {
throw new WasbAuthorizationException(
"RemoteAuthorizerResponse object null from remote call");
} else if (authorizerResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return authorizerResponse.getAuthorizationResult();
} else {
throw new WasbAuthorizationException("Remote authorization"
+ " service encountered an error "
+ authorizerResponse.getResponseMessage());
if (authorizerResponse == null) {
throw new WasbAuthorizationException(
"RemoteAuthorizerResponse object null from remote call");
} else if (authorizerResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return authorizerResponse.getAuthorizationResult();
} else {
throw new WasbAuthorizationException("Remote authorization"
+ " serivce encountered an error "
+ authorizerResponse.getResponseMessage());
}
} catch (URISyntaxException | WasbRemoteCallException
| JsonParseException | JsonMappingException ex) {
throw new WasbAuthorizationException(ex);
}
} catch (URISyntaxException | WasbRemoteCallException
| JsonParseException | JsonMappingException ex) {
throw new WasbAuthorizationException(ex);
}
}
}

View File

@ -69,21 +69,20 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
public static final String SAS_ERROR_CODE = "SAS Error";
private SASKeyGeneratorInterface sasKeyGenerator;
private String storageAccount;
private String delegationToken;
public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
Configuration conf, String delegationToken)
throws SecureModeException {
Configuration conf) throws SecureModeException {
this.delegationToken = delegationToken;
if (useLocalSASKeyMode) {
this.sasKeyGenerator = new LocalSASKeyGeneratorImpl(conf);
} else {
RemoteSASKeyGeneratorImpl remoteSasKeyGenerator =
new RemoteSASKeyGeneratorImpl(conf);
if (!remoteSasKeyGenerator.initialize(conf, this.delegationToken)) {
try {
remoteSasKeyGenerator.initialize(conf);
} catch (IOException ioe) {
throw new SecureModeException("Remote SAS Key mode could"
+ " not be initialized");
+ " not be initialized", ioe);
}
this.sasKeyGenerator = remoteSasKeyGenerator;
}

View File

@ -43,11 +43,10 @@ public void init(Configuration conf)
* @param wasbAbolutePath : Absolute WASB Path used for access.
* @param accessType : Type of access
* @param delegationToken : The user information.
* @return : true - If access allowed false - If access is not allowed.
* @throws WasbAuthorizationException - On authorization exceptions
* @throws IOException - When not able to reach the authorizer
*/
public boolean authorize(String wasbAbolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException, IOException;
boolean authorize(String wasbAbolutePath, String accessType)
throws WasbAuthorizationException, IOException;
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
/**
* Constants for used with WASB security implementation.
*/
public final class Constants {
private Constants() {
}
/**
* Configuration parameter name expected in the Configuration
* object to provide the url of the remote service {@value}
*/
public static final String KEY_CRED_SERVICE_URL = "fs.azure.cred.service.url";
/**
* Default port of the remote service used as delegation token manager and Azure storage SAS key generator.
*/
public static final int DEFAULT_CRED_SERVICE_PORT = 50911;
/**
* Default remote delegation token manager endpoint.
*/
public static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT = "/tokenmanager/v1";
/**
* The configuration property to enable Kerberos support.
*/
public static final String AZURE_KERBEROS_SUPPORT_PROPERTY_NAME = "fs.azure.enable.kerberos.support";
/**
* Parameter to be used for impersonation.
*/
public static final String DOAS_PARAM = "doas";
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.RemoteWasbAuthorizerImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
/**
* Security Utils class for WASB.
*/
public final class SecurityUtils {
private SecurityUtils() {
}
/**
* Utility method to get remote service URLs from the configuration.
* @param conf configuration object.
* @return remote service URL
* @throws UnknownHostException thrown when getting the default value.
*/
public static String getCredServiceUrls(Configuration conf)
throws UnknownHostException {
return conf.get(Constants.KEY_CRED_SERVICE_URL, String
.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
}
/**
* Utility method to get remote Authorization service URLs from the configuration.
* @param conf Configuration object.
* @return remote Authorization server URL
* @throws UnknownHostException thrown when getting the default value.
*/
public static String getRemoteAuthServiceUrls(Configuration conf)
throws UnknownHostException {
return conf.get(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, String
.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
}
/**
* Utility method to get delegation token from the UGI credentials.
* @return delegation token
* @throws IOException thrown when getting the current user.
*/
public static String getDelegationTokenFromCredentials() throws IOException {
String delegationToken = null;
Iterator<Token<? extends TokenIdentifier>> tokenIterator = UserGroupInformation
.getCurrentUser().getCredentials().getAllTokens().iterator();
while (tokenIterator.hasNext()) {
Token<? extends TokenIdentifier> iteratedToken = tokenIterator.next();
if (iteratedToken.getKind()
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
delegationToken = iteratedToken.encodeToUrlString();
}
}
return delegationToken;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
/**
* Delegation token Identifier for WASB delegation tokens.
*/
public class WasbDelegationTokenIdentifier extends DelegationTokenIdentifier {
public static final Text TOKEN_KIND = new Text("WASB delegation");
public WasbDelegationTokenIdentifier(){
super(TOKEN_KIND);
}
public WasbDelegationTokenIdentifier(Text kind) {
super(kind);
}
public WasbDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
Text realUser) {
super(kind, owner, renewer, realUser);
}
@Override
public Text getKind() {
return TOKEN_KIND;
}
}

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
/**
* Token Renewer for renewing WASB delegation tokens with remote service.
*/
public class WasbTokenRenewer extends TokenRenewer {
public static final Logger LOG = LoggerFactory
.getLogger(WasbTokenRenewer.class);
/**
* Checks if this particular object handles the Kind of token passed.
* @param kind the kind of the token
* @return true if it handles passed token kind false otherwise.
*/
@Override
public boolean handleKind(Text kind) {
return WasbDelegationTokenIdentifier.TOKEN_KIND.equals(kind);
}
/**
* Checks if passed token is managed.
* @param token the token being checked
* @return true if it is managed.
* @throws IOException thrown when evaluating if token is managed.
*/
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
/**
* Renew the delegation token.
* @param token token to renew.
* @param conf configuration object.
* @return extended expiry time of the token.
* @throws IOException thrown when trying get current user.
* @throws InterruptedException thrown when thread is interrupted
*/
@Override
public long renew(final Token<?> token, Configuration conf)
throws IOException, InterruptedException {
LOG.debug("Renewing the delegation token");
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
}
if (!connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
connectUgi.checkTGTAndReloginFromKeytab();
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
authToken
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
String.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
authenticator);
return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return authURL.renewDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
}
});
}
/**
* Cancel the delegation token.
* @param token token to cancel.
* @param conf configuration object.
* @throws IOException thrown when trying get current user.
* @throws InterruptedException thrown when thread is interrupted.
*/
@Override
public void cancel(final Token<?> token, Configuration conf)
throws IOException, InterruptedException {
LOG.debug("Cancelling the delegation token");
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
}
if (!connectUgi.hasKerberosCredentials()) {
connectUgi = UserGroupInformation.getLoginUser();
}
connectUgi.checkTGTAndReloginFromKeytab();
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
authToken
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
String.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
authenticator);
connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
authURL.cancelDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
return null;
}
});
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.fs.azure.security.WasbTokenRenewer

View File

@ -44,8 +44,8 @@ public void addAuthRule(String wasbAbsolutePath,
}
@Override
public boolean authorize(String wasbAbsolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException {
public boolean authorize(String wasbAbsolutePath, String accessType)
throws WasbAuthorizationException {
AuthorizationComponent component =
new AuthorizationComponent(wasbAbsolutePath, accessType);