HADOOP-14443. Azure: Support retry and client side failover for authorization, SASKey and delegation token generation. Contributed by Santhosh G Nayak
This commit is contained in:
parent
bcba844d11
commit
38996fdcf0
@ -27,9 +27,7 @@
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
@ -65,15 +63,14 @@
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.SecurityUtils;
|
||||
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
@ -1177,7 +1174,7 @@ private void restoreKey() throws IOException {
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
|
||||
private String delegationToken = null;
|
||||
private WasbDelegationTokenManager wasbDelegationTokenManager;
|
||||
|
||||
public NativeAzureFileSystem() {
|
||||
// set store in initialize()
|
||||
@ -1327,9 +1324,7 @@ public void initialize(URI uri, Configuration conf)
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
|
||||
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
|
||||
authURL = new DelegationTokenAuthenticatedURL(authenticator);
|
||||
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
|
||||
this.wasbDelegationTokenManager = new RemoteWasbDelegationTokenManager(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3002,31 +2997,7 @@ public synchronized void close() throws IOException {
|
||||
@Override
|
||||
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
|
||||
if (kerberosSupportEnabled) {
|
||||
try {
|
||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
final UserGroupInformation proxyUser = connectUgi;
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
}
|
||||
connectUgi.checkTGTAndReloginFromKeytab();
|
||||
return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> run() throws Exception {
|
||||
return authURL.getDelegationToken(new URL(credServiceUrl
|
||||
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
|
||||
authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
|
||||
}
|
||||
});
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Error in fetching the delegation token from remote service",
|
||||
ex);
|
||||
if (ex instanceof IOException) {
|
||||
throw (IOException) ex;
|
||||
} else {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
return wasbDelegationTokenManager.getDelegationToken(renewer);
|
||||
} else {
|
||||
return super.getDelegationToken(renewer);
|
||||
}
|
||||
|
@ -21,20 +21,16 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectReader;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.Validate;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.SecurityUtils;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
|
||||
@ -56,56 +52,65 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
|
||||
|
||||
private static final ObjectReader RESPONSE_READER = new ObjectMapper()
|
||||
.readerFor(RemoteSASKeyGenerationResponse.class);
|
||||
|
||||
/**
|
||||
* Configuration parameter name expected in the Configuration
|
||||
* object to provide the url of the remote service {@value}
|
||||
*/
|
||||
public static final String KEY_CRED_SERVICE_URLS =
|
||||
"fs.azure.cred.service.urls";
|
||||
/**
|
||||
* Configuration key to enable http retry policy for SAS Key generation. {@value}
|
||||
*/
|
||||
public static final String
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
|
||||
"fs.azure.saskeygenerator.http.retry.policy.enabled";
|
||||
/**
|
||||
* Configuration key for SAS Key Generation http retry policy spec. {@value}
|
||||
*/
|
||||
public static final String
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
|
||||
"fs.azure.saskeygenerator.http.retry.policy.spec";
|
||||
/**
|
||||
* Container SAS Key generation OP name. {@value}
|
||||
*/
|
||||
private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
|
||||
|
||||
/**
|
||||
* Relative Blob SAS Key generation OP name. {@value}
|
||||
*/
|
||||
private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
|
||||
|
||||
/**
|
||||
* Query parameter specifying the expiry period to be used for sas key
|
||||
* {@value}
|
||||
*/
|
||||
private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
|
||||
|
||||
/**
|
||||
* Query parameter name for the storage account. {@value}
|
||||
*/
|
||||
private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
|
||||
"storage_account";
|
||||
|
||||
/**
|
||||
* Query parameter name for the storage account container. {@value}
|
||||
*/
|
||||
private static final String CONTAINER_QUERY_PARAM_NAME =
|
||||
"container";
|
||||
|
||||
/**
|
||||
* Query parameter name for user info {@value}
|
||||
*/
|
||||
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
|
||||
"delegation";
|
||||
|
||||
private static final String CONTAINER_QUERY_PARAM_NAME = "container";
|
||||
/**
|
||||
* Query parameter name for the relative path inside the storage
|
||||
* account container. {@value}
|
||||
*/
|
||||
private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
|
||||
"relative_path";
|
||||
private static final String RELATIVE_PATH_QUERY_PARAM_NAME = "relative_path";
|
||||
/**
|
||||
* SAS Key Generation Remote http client retry policy spec. {@value}
|
||||
*/
|
||||
private static final String
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||
"1000,3,10000,2";
|
||||
|
||||
private String delegationToken;
|
||||
private String credServiceUrl = "";
|
||||
private WasbRemoteCallHelper remoteCallHelper = null;
|
||||
private boolean isSecurityEnabled;
|
||||
private boolean isKerberosSupportEnabled;
|
||||
private RetryPolicy retryPolicy;
|
||||
private String[] commaSeparatedUrls;
|
||||
|
||||
public RemoteSASKeyGeneratorImpl(Configuration conf) {
|
||||
super(conf);
|
||||
@ -114,180 +119,111 @@ public RemoteSASKeyGeneratorImpl(Configuration conf) {
|
||||
public void initialize(Configuration conf) throws IOException {
|
||||
|
||||
LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
|
||||
setDelegationToken();
|
||||
try {
|
||||
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
|
||||
} catch (UnknownHostException e) {
|
||||
final String msg = "Invalid CredService Url, configure it correctly";
|
||||
LOG.error(msg, e);
|
||||
throw new IOException(msg, e);
|
||||
}
|
||||
|
||||
if (credServiceUrl == null || credServiceUrl.isEmpty()) {
|
||||
final String msg = "CredService Url not found in configuration to "
|
||||
+ "initialize RemoteSASKeyGenerator";
|
||||
LOG.error(msg);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||
|
||||
remoteCallHelper = new WasbRemoteCallHelper();
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
this.isKerberosSupportEnabled = conf.getBoolean(
|
||||
Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||
this.isKerberosSupportEnabled =
|
||||
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||
this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
|
||||
if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
|
||||
throw new IOException(
|
||||
KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
|
||||
}
|
||||
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
||||
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
|
||||
} else {
|
||||
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
||||
}
|
||||
LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getContainerSASUri(String storageAccount, String container)
|
||||
throws SASKeyGenerationException {
|
||||
public URI getContainerSASUri(String storageAccount,
|
||||
String container) throws SASKeyGenerationException {
|
||||
RemoteSASKeyGenerationResponse sasKeyResponse = null;
|
||||
try {
|
||||
LOG.debug("Generating Container SAS Key for Container {} "
|
||||
+ "inside Storage Account {} ", container, storageAccount);
|
||||
setDelegationToken();
|
||||
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
|
||||
URIBuilder uriBuilder = new URIBuilder();
|
||||
uriBuilder.setPath("/" + CONTAINER_SAS_OP);
|
||||
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
|
||||
storageAccount);
|
||||
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
|
||||
container);
|
||||
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
|
||||
+ getSasKeyExpiryPeriod());
|
||||
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
|
||||
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
||||
this.delegationToken);
|
||||
}
|
||||
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
|
||||
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
|
||||
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
|
||||
"" + getSasKeyExpiryPeriod());
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
sasKeyResponse = makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams());
|
||||
|
||||
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
|
||||
return new URI(sasKeyResponse.getSasKey());
|
||||
} else {
|
||||
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
|
||||
throw new SASKeyGenerationException(
|
||||
"Remote Service encountered error in SAS Key generation : "
|
||||
+ sasKeyResponse.getResponseMessage());
|
||||
}
|
||||
return getSASKey(uriBuilder.build(), connectUgi);
|
||||
} catch (URISyntaxException uriSyntaxEx) {
|
||||
throw new SASKeyGenerationException("Encountered URISyntaxException "
|
||||
+ "while building the HttpGetRequest to remote cred service",
|
||||
throw new SASKeyGenerationException("Encountered URISyntaxException"
|
||||
+ " while building the HttpGetRequest to remote service for ",
|
||||
uriSyntaxEx);
|
||||
} catch (IOException e) {
|
||||
throw new SASKeyGenerationException("Encountered IOException"
|
||||
+ " while building the HttpGetRequest to remote service", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getRelativeBlobSASUri(String storageAccount, String container,
|
||||
String relativePath) throws SASKeyGenerationException {
|
||||
public URI getRelativeBlobSASUri(String storageAccount,
|
||||
String container, String relativePath) throws SASKeyGenerationException {
|
||||
|
||||
try {
|
||||
LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
|
||||
+ " Container {} inside Storage Account {} ",
|
||||
relativePath, container, storageAccount);
|
||||
setDelegationToken();
|
||||
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
|
||||
URIBuilder uriBuilder = new URIBuilder();
|
||||
uriBuilder.setPath("/" + BLOB_SAS_OP);
|
||||
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
|
||||
storageAccount);
|
||||
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
|
||||
container);
|
||||
uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
|
||||
relativePath);
|
||||
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
|
||||
+ getSasKeyExpiryPeriod());
|
||||
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
|
||||
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
|
||||
uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME, relativePath);
|
||||
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
|
||||
"" + getSasKeyExpiryPeriod());
|
||||
|
||||
if (isSecurityEnabled && StringUtils.isNotEmpty(
|
||||
delegationToken)) {
|
||||
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
||||
this.delegationToken);
|
||||
}
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
RemoteSASKeyGenerationResponse sasKeyResponse =
|
||||
makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams());
|
||||
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
|
||||
return new URI(sasKeyResponse.getSasKey());
|
||||
} else {
|
||||
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
|
||||
throw new SASKeyGenerationException(
|
||||
"Remote Service encountered error in SAS Key generation : "
|
||||
+ sasKeyResponse.getResponseMessage());
|
||||
}
|
||||
return getSASKey(uriBuilder.build(), connectUgi);
|
||||
} catch (URISyntaxException uriSyntaxEx) {
|
||||
throw new SASKeyGenerationException("Encountered URISyntaxException"
|
||||
+ " while building the HttpGetRequest to " + " remote service",
|
||||
uriSyntaxEx);
|
||||
} catch (IOException e) {
|
||||
throw new SASKeyGenerationException("Encountered IOException"
|
||||
+ " while building the HttpGetRequest to remote service", e);
|
||||
}
|
||||
}
|
||||
|
||||
private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
|
||||
throws URISyntaxException, SASKeyGenerationException {
|
||||
final RemoteSASKeyGenerationResponse sasKeyResponse;
|
||||
try {
|
||||
sasKeyResponse = connectUgi.doAs(
|
||||
new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
|
||||
@Override
|
||||
public RemoteSASKeyGenerationResponse run() throws Exception {
|
||||
AuthenticatedURL.Token token = null;
|
||||
if (isKerberosSupportEnabled && UserGroupInformation
|
||||
.isSecurityEnabled() && (delegationToken == null
|
||||
|| delegationToken.isEmpty())) {
|
||||
token = new AuthenticatedURL.Token();
|
||||
final Authenticator kerberosAuthenticator =
|
||||
new KerberosDelegationTokenAuthenticator();
|
||||
try {
|
||||
kerberosAuthenticator.authenticate(uri.toURL(), token);
|
||||
Validate.isTrue(token.isSet(),
|
||||
"Authenticated Token is NOT present. "
|
||||
+ "The request cannot proceed.");
|
||||
} catch (AuthenticationException e) {
|
||||
throw new IOException(
|
||||
"Authentication failed in check authorization", e);
|
||||
}
|
||||
}
|
||||
return makeRemoteRequest(uri,
|
||||
(token != null ? token.toString() : null));
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException | IOException e) {
|
||||
final String msg = "Error fetching SAS Key from Remote Service: " + uri;
|
||||
LOG.error(msg, e);
|
||||
if (e instanceof InterruptedException) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
throw new SASKeyGenerationException(msg, e);
|
||||
}
|
||||
|
||||
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
|
||||
return new URI(sasKeyResponse.getSasKey());
|
||||
} else {
|
||||
throw new SASKeyGenerationException(
|
||||
"Remote Service encountered error in SAS Key generation : "
|
||||
+ sasKeyResponse.getResponseMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to make a remote request.
|
||||
* @param uri - Uri to use for the remote request
|
||||
* @param token - hadoop.auth token for the remote request
|
||||
*
|
||||
* @param urls - Urls to use for the remote request
|
||||
* @param path - hadoop.auth token for the remote request
|
||||
* @param queryParams - queryParams to be used.
|
||||
* @return RemoteSASKeyGenerationResponse
|
||||
*/
|
||||
private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
|
||||
String token) throws SASKeyGenerationException {
|
||||
private RemoteSASKeyGenerationResponse makeRemoteRequest(String[] urls,
|
||||
String path, List<NameValuePair> queryParams)
|
||||
throws SASKeyGenerationException {
|
||||
|
||||
try {
|
||||
HttpGet httpGet = new HttpGet(uri);
|
||||
if (token != null) {
|
||||
httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
|
||||
}
|
||||
String responseBody = remoteCallHelper.makeRemoteGetRequest(httpGet);
|
||||
String responseBody = remoteCallHelper
|
||||
.makeRemoteRequest(urls, path, queryParams, HttpGet.METHOD_NAME);
|
||||
return RESPONSE_READER.readValue(responseBody);
|
||||
|
||||
} catch (WasbRemoteCallException remoteCallEx) {
|
||||
throw new SASKeyGenerationException("Encountered RemoteCallException"
|
||||
+ " while retrieving SAS key from remote service", remoteCallEx);
|
||||
} catch (JsonParseException jsonParserEx) {
|
||||
throw new SASKeyGenerationException("Encountered JsonParseException "
|
||||
+ "while parsing the response from remote"
|
||||
+ " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
|
||||
+ " service into RemoteSASKeyGenerationResponse object",
|
||||
jsonParserEx);
|
||||
} catch (JsonMappingException jsonMappingEx) {
|
||||
throw new SASKeyGenerationException("Encountered JsonMappingException"
|
||||
+ " while mapping the response from remote service into "
|
||||
@ -297,10 +233,6 @@ private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
|
||||
+ "accessing remote service to retrieve SAS Key", ioEx);
|
||||
}
|
||||
}
|
||||
|
||||
private void setDelegationToken() throws IOException {
|
||||
this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -309,9 +241,9 @@ private void setDelegationToken() throws IOException {
|
||||
* The remote SAS Key generation service is expected to
|
||||
* return SAS key in json format:
|
||||
* {
|
||||
* "responseCode" : 0 or non-zero <int>,
|
||||
* "responseMessage" : relavant message on failure <String>,
|
||||
* "sasKey" : Requested SAS Key <String>
|
||||
* "responseCode" : 0 or non-zero <int>,
|
||||
* "responseMessage" : relavant message on failure <String>,
|
||||
* "sasKey" : Requested SAS Key <String>
|
||||
* }
|
||||
*/
|
||||
class RemoteSASKeyGenerationResponse {
|
||||
|
@ -24,23 +24,17 @@
|
||||
import com.fasterxml.jackson.databind.ObjectReader;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang.Validate;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.SecurityUtils;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
|
||||
|
||||
@ -55,54 +49,59 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
|
||||
public static final Logger LOG = LoggerFactory
|
||||
.getLogger(RemoteWasbAuthorizerImpl.class);
|
||||
private static final ObjectReader RESPONSE_READER = new ObjectMapper()
|
||||
.readerFor(RemoteAuthorizerResponse.class);
|
||||
|
||||
private String remoteAuthorizerServiceUrl = null;
|
||||
.readerFor(RemoteWasbAuthorizerResponse.class);
|
||||
|
||||
/**
|
||||
* Configuration parameter name expected in the Configuration object to
|
||||
* provide the url of the remote service. {@value}
|
||||
* provide the urls of the remote service instances. {@value}
|
||||
*/
|
||||
public static final String KEY_REMOTE_AUTH_SERVICE_URL =
|
||||
"fs.azure.authorization.remote.service.url";
|
||||
|
||||
public static final String KEY_REMOTE_AUTH_SERVICE_URLS =
|
||||
"fs.azure.authorization.remote.service.urls";
|
||||
/**
|
||||
* Authorization operation OP name in the remote service {@value}
|
||||
*/
|
||||
private static final String CHECK_AUTHORIZATION_OP =
|
||||
"CHECK_AUTHORIZATION";
|
||||
|
||||
private static final String CHECK_AUTHORIZATION_OP = "CHECK_AUTHORIZATION";
|
||||
/**
|
||||
* Query parameter specifying the access operation type. {@value}
|
||||
*/
|
||||
private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
|
||||
"operation_type";
|
||||
|
||||
/**
|
||||
* Query parameter specifying the wasb absolute path. {@value}
|
||||
*/
|
||||
private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
|
||||
"wasb_absolute_path";
|
||||
|
||||
/**
|
||||
* Query parameter name for user info {@value}
|
||||
*/
|
||||
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
|
||||
"delegation";
|
||||
|
||||
/**
|
||||
* Query parameter name for sending owner of the specific resource {@value}
|
||||
*/
|
||||
private static final String WASB_RESOURCE_OWNER_QUERY_PARAM_NAME =
|
||||
"wasb_resource_owner";
|
||||
|
||||
private WasbRemoteCallHelper remoteCallHelper = null;
|
||||
private String delegationToken;
|
||||
private boolean isSecurityEnabled;
|
||||
private boolean isKerberosSupportEnabled;
|
||||
/**
|
||||
* Authorization Remote http client retry policy enabled configuration key. {@value}
|
||||
*/
|
||||
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
|
||||
"fs.azure.authorizer.http.retry.policy.enabled";
|
||||
|
||||
@VisibleForTesting
|
||||
public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
|
||||
/**
|
||||
* Authorization Remote http client retry policy spec. {@value}
|
||||
*/
|
||||
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC =
|
||||
"fs.azure.authorizer.http.retry.policy.spec";
|
||||
|
||||
/**
|
||||
* Authorization Remote http client retry policy spec default value. {@value}
|
||||
*/
|
||||
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||
"1000,3,10000,2";
|
||||
|
||||
private WasbRemoteCallHelper remoteCallHelper = null;
|
||||
private boolean isKerberosSupportEnabled;
|
||||
private RetryPolicy retryPolicy;
|
||||
private String[] commaSeparatedUrls = null;
|
||||
|
||||
@VisibleForTesting public void updateWasbRemoteCallHelper(
|
||||
WasbRemoteCallHelper helper) {
|
||||
this.remoteCallHelper = helper;
|
||||
}
|
||||
|
||||
@ -110,114 +109,63 @@ public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
|
||||
public void init(Configuration conf)
|
||||
throws WasbAuthorizationException, IOException {
|
||||
LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
|
||||
setDelegationToken();
|
||||
remoteAuthorizerServiceUrl = SecurityUtils
|
||||
.getRemoteAuthServiceUrls(conf);
|
||||
|
||||
if (remoteAuthorizerServiceUrl == null
|
||||
|| remoteAuthorizerServiceUrl.isEmpty()) {
|
||||
throw new WasbAuthorizationException(
|
||||
"fs.azure.authorization.remote.service.url config not set"
|
||||
+ " in configuration.");
|
||||
this.isKerberosSupportEnabled =
|
||||
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||
this.commaSeparatedUrls =
|
||||
conf.getTrimmedStrings(KEY_REMOTE_AUTH_SERVICE_URLS);
|
||||
if (this.commaSeparatedUrls == null
|
||||
|| this.commaSeparatedUrls.length <= 0) {
|
||||
throw new IOException(KEY_REMOTE_AUTH_SERVICE_URLS + " config not set"
|
||||
+ " in configuration.");
|
||||
}
|
||||
this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
|
||||
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
|
||||
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC,
|
||||
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
||||
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
|
||||
} else {
|
||||
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
||||
}
|
||||
|
||||
this.remoteCallHelper = new WasbRemoteCallHelper();
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
this.isKerberosSupportEnabled = conf
|
||||
.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean authorize(String wasbAbsolutePath, String accessType, String resourceOwner)
|
||||
throws WasbAuthorizationException, IOException {
|
||||
|
||||
try {
|
||||
|
||||
try {
|
||||
/* Make an exception for the internal -RenamePending files */
|
||||
if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
setDelegationToken();
|
||||
URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
|
||||
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
|
||||
uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
|
||||
wasbAbsolutePath);
|
||||
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
|
||||
accessType);
|
||||
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
|
||||
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
|
||||
delegationToken);
|
||||
}
|
||||
if (resourceOwner != null && StringUtils.isNotEmpty(resourceOwner)) {
|
||||
uriBuilder.addParameter(WASB_RESOURCE_OWNER_QUERY_PARAM_NAME,
|
||||
resourceOwner);
|
||||
}
|
||||
|
||||
String responseBody = null;
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
} else {
|
||||
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
|
||||
}
|
||||
|
||||
try {
|
||||
responseBody = connectUgi
|
||||
.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override
|
||||
public String run() throws Exception {
|
||||
AuthenticatedURL.Token token = null;
|
||||
HttpGet httpGet = new HttpGet(uriBuilder.build());
|
||||
if (isKerberosSupportEnabled && UserGroupInformation
|
||||
.isSecurityEnabled() && (delegationToken == null
|
||||
|| delegationToken.isEmpty())) {
|
||||
token = new AuthenticatedURL.Token();
|
||||
final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
|
||||
try {
|
||||
kerberosAuthenticator
|
||||
.authenticate(uriBuilder.build().toURL(), token);
|
||||
Validate.isTrue(token.isSet(),
|
||||
"Authenticated Token is NOT present. The request cannot proceed.");
|
||||
} catch (AuthenticationException e){
|
||||
throw new IOException("Authentication failed in check authorization", e);
|
||||
}
|
||||
if (token != null) {
|
||||
httpGet.setHeader("Cookie",
|
||||
AuthenticatedURL.AUTH_COOKIE + "=" + token);
|
||||
}
|
||||
}
|
||||
return remoteCallHelper.makeRemoteGetRequest(httpGet);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Error in check authorization", e);
|
||||
throw new WasbAuthorizationException("Error in check authorize", e);
|
||||
}
|
||||
|
||||
RemoteAuthorizerResponse authorizerResponse =
|
||||
RESPONSE_READER.readValue(responseBody);
|
||||
|
||||
if (authorizerResponse == null) {
|
||||
throw new WasbAuthorizationException(
|
||||
"RemoteAuthorizerResponse object null from remote call");
|
||||
} else if (authorizerResponse.getResponseCode()
|
||||
== REMOTE_CALL_SUCCESS_CODE) {
|
||||
return authorizerResponse.getAuthorizationResult();
|
||||
} else {
|
||||
throw new WasbAuthorizationException("Remote authorization"
|
||||
+ " service encountered an error "
|
||||
+ authorizerResponse.getResponseMessage());
|
||||
}
|
||||
} catch (URISyntaxException | WasbRemoteCallException
|
||||
| JsonParseException | JsonMappingException ex) {
|
||||
throw new WasbAuthorizationException(ex);
|
||||
final URIBuilder uriBuilder = new URIBuilder();
|
||||
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
|
||||
uriBuilder
|
||||
.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME, wasbAbsolutePath);
|
||||
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME, accessType);
|
||||
if (resourceOwner != null && StringUtils.isNotEmpty(resourceOwner)) {
|
||||
uriBuilder.addParameter(WASB_RESOURCE_OWNER_QUERY_PARAM_NAME,
|
||||
resourceOwner);
|
||||
}
|
||||
}
|
||||
|
||||
private void setDelegationToken() throws IOException {
|
||||
this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
|
||||
String responseBody = remoteCallHelper
|
||||
.makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
|
||||
|
||||
RemoteWasbAuthorizerResponse authorizerResponse = RESPONSE_READER
|
||||
.readValue(responseBody);
|
||||
|
||||
if (authorizerResponse == null) {
|
||||
throw new WasbAuthorizationException(
|
||||
"RemoteWasbAuthorizerResponse object null from remote call");
|
||||
} else if (authorizerResponse.getResponseCode()
|
||||
== REMOTE_CALL_SUCCESS_CODE) {
|
||||
return authorizerResponse.getAuthorizationResult();
|
||||
} else {
|
||||
throw new WasbAuthorizationException(
|
||||
"Remote authorization" + " service encountered an error "
|
||||
+ authorizerResponse.getResponseMessage());
|
||||
}
|
||||
} catch (WasbRemoteCallException | JsonParseException | JsonMappingException ex) {
|
||||
throw new WasbAuthorizationException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -227,30 +175,19 @@ private void setDelegationToken() throws IOException {
|
||||
* The remote service is expected to return the authorization
|
||||
* response in the following JSON format
|
||||
* {
|
||||
* "responseCode" : 0 or non-zero <int>,
|
||||
* "responseMessage" : relevant message of failure <String>
|
||||
* "authorizationResult" : authorization result <boolean>
|
||||
* true - if auhorization allowed
|
||||
* false - otherwise.
|
||||
*
|
||||
* "responseCode" : 0 or non-zero <int>,
|
||||
* "responseMessage" : relevant message of failure <String>
|
||||
* "authorizationResult" : authorization result <boolean>
|
||||
* true - if auhorization allowed
|
||||
* false - otherwise.
|
||||
* }
|
||||
*/
|
||||
class RemoteAuthorizerResponse {
|
||||
class RemoteWasbAuthorizerResponse {
|
||||
|
||||
private int responseCode;
|
||||
private boolean authorizationResult;
|
||||
private String responseMessage;
|
||||
|
||||
public RemoteAuthorizerResponse(int responseCode,
|
||||
boolean authorizationResult, String message) {
|
||||
this.responseCode = responseCode;
|
||||
this.authorizationResult = authorizationResult;
|
||||
this.responseMessage = message;
|
||||
}
|
||||
|
||||
public RemoteAuthorizerResponse() {
|
||||
}
|
||||
|
||||
public int getResponseCode() {
|
||||
return responseCode;
|
||||
}
|
||||
|
@ -0,0 +1,210 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import org.apache.commons.lang.Validate;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Helper class the has constants and helper methods
|
||||
* used in WASB when integrating with a remote http cred
|
||||
* service which uses Kerberos and delegation tokens.
|
||||
* Currently, remote service will be used to generate
|
||||
* SAS keys, authorization and delegation token operations.
|
||||
*/
|
||||
public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(SecureWasbRemoteCallHelper.class);
|
||||
/**
|
||||
* Delegation token query parameter to be used when making rest call.
|
||||
*/
|
||||
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME = "delegation";
|
||||
|
||||
/**
|
||||
* Delegation token to be used for making the remote call.
|
||||
*/
|
||||
private Token<?> delegationToken = null;
|
||||
|
||||
/**
|
||||
* Does Remote Http Call requires Kerberos Authentication always, even if the delegation token is present.
|
||||
*/
|
||||
private boolean alwaysRequiresKerberosAuth;
|
||||
|
||||
public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
|
||||
boolean alwaysRequiresKerberosAuth) {
|
||||
super(retryPolicy);
|
||||
this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String makeRemoteRequest(final String[] urls,
|
||||
final String path, final List<NameValuePair> queryParams,
|
||||
final String httpMethod) throws IOException {
|
||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
}
|
||||
if (delegationToken == null) {
|
||||
connectUgi.checkTGTAndReloginFromKeytab();
|
||||
}
|
||||
String s = null;
|
||||
try {
|
||||
s = connectUgi.doAs(new PrivilegedExceptionAction<String>() {
|
||||
@Override public String run() throws Exception {
|
||||
return retryableRequest(urls, path, queryParams, httpMethod);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException(e.getMessage(), e);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpUriRequest getHttpRequest(String[] urls, String path,
|
||||
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
|
||||
throws URISyntaxException, IOException {
|
||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
if (connectUgi != null) {
|
||||
queryParams.add(new NameValuePair() {
|
||||
@Override public String getName() {
|
||||
return Constants.DOAS_PARAM;
|
||||
}
|
||||
|
||||
@Override public String getValue() {
|
||||
return ugi.getShortUserName();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final Token delegationToken = getDelegationToken(ugi);
|
||||
if (!alwaysRequiresKerberosAuth && delegationToken != null) {
|
||||
final String delegationTokenEncodedUrlString =
|
||||
delegationToken.encodeToUrlString();
|
||||
queryParams.add(new NameValuePair() {
|
||||
@Override public String getName() {
|
||||
return DELEGATION_TOKEN_QUERY_PARAM_NAME;
|
||||
}
|
||||
|
||||
@Override public String getValue() {
|
||||
return delegationTokenEncodedUrlString;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
URIBuilder uriBuilder =
|
||||
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
||||
HttpUriRequest httpUriRequest = null;
|
||||
switch (httpMethod) {
|
||||
case HttpPut.METHOD_NAME:
|
||||
httpUriRequest = new HttpPut(uriBuilder.build());
|
||||
break;
|
||||
case HttpPost.METHOD_NAME:
|
||||
httpUriRequest = new HttpPost(uriBuilder.build());
|
||||
break;
|
||||
default:
|
||||
httpUriRequest = new HttpGet(uriBuilder.build());
|
||||
break;
|
||||
}
|
||||
|
||||
LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
|
||||
uriBuilder.build().toURL());
|
||||
if (alwaysRequiresKerberosAuth || delegationToken == null) {
|
||||
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
||||
final Authenticator kerberosAuthenticator =
|
||||
new KerberosDelegationTokenAuthenticator();
|
||||
try {
|
||||
kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
|
||||
} catch (AuthenticationException e) {
|
||||
throw new WasbRemoteCallException(
|
||||
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
|
||||
}
|
||||
Validate.isTrue(token.isSet(),
|
||||
"Authenticated Token is NOT present. The request cannot proceed.");
|
||||
|
||||
httpUriRequest.setHeader("Cookie",
|
||||
AuthenticatedURL.AUTH_COOKIE + "=" + token);
|
||||
}
|
||||
return httpUriRequest;
|
||||
}
|
||||
|
||||
private synchronized Token<?> getDelegationToken(
|
||||
UserGroupInformation userGroupInformation) throws IOException {
|
||||
if (this.delegationToken == null) {
|
||||
Token<?> token = null;
|
||||
for (Token iterToken : userGroupInformation.getTokens()) {
|
||||
if (iterToken.getKind()
|
||||
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
|
||||
token = iterToken;
|
||||
LOG.debug("{} token found in cache : {}",
|
||||
WasbDelegationTokenIdentifier.TOKEN_KIND, iterToken);
|
||||
break;
|
||||
}
|
||||
}
|
||||
LOG.debug("UGI Information: {}", userGroupInformation.toString());
|
||||
|
||||
// ugi tokens are usually indicative of a task which can't
|
||||
// refetch tokens. even if ugi has credentials, don't attempt
|
||||
// to get another token to match hdfs/rpc behavior
|
||||
if (token != null) {
|
||||
LOG.debug("Using UGI token: {}", token);
|
||||
setDelegationToken(token);
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Delegation token from cache - {}", delegationToken != null
|
||||
? delegationToken.encodeToUrlString()
|
||||
: "null");
|
||||
}
|
||||
return this.delegationToken;
|
||||
}
|
||||
|
||||
private <T extends TokenIdentifier> void setDelegationToken(
|
||||
final Token<T> token) {
|
||||
synchronized (this) {
|
||||
this.delegationToken = token;
|
||||
}
|
||||
}
|
||||
}
|
@ -6,9 +6,9 @@
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -19,19 +19,31 @@
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.NameValuePair;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.client.ClientProtocolException;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
* Helper class the has constants and helper methods
|
||||
@ -39,101 +51,212 @@
|
||||
* service. Currently, remote service will be used to generate
|
||||
* SAS keys.
|
||||
*/
|
||||
class WasbRemoteCallHelper {
|
||||
public class WasbRemoteCallHelper {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(WasbRemoteCallHelper.class);
|
||||
/**
|
||||
* Return code when the remote call is successful. {@value}
|
||||
*/
|
||||
public static final int REMOTE_CALL_SUCCESS_CODE = 0;
|
||||
|
||||
/**
|
||||
* Application Json content type.
|
||||
*/
|
||||
private static final String APPLICATION_JSON = "application/json";
|
||||
|
||||
/**
|
||||
* Max content length of the response.
|
||||
*/
|
||||
private static final int MAX_CONTENT_LENGTH = 1024;
|
||||
|
||||
/**
|
||||
* Client instance to be used for making the remote call.
|
||||
*/
|
||||
private HttpClient client = null;
|
||||
|
||||
private Random random = new Random();
|
||||
|
||||
private RetryPolicy retryPolicy = null;
|
||||
|
||||
public WasbRemoteCallHelper(RetryPolicy retryPolicy) {
|
||||
this.client = HttpClientBuilder.create().build();
|
||||
this.retryPolicy = retryPolicy;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void updateHttpClient(HttpClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
public WasbRemoteCallHelper() {
|
||||
this.client = HttpClientBuilder.create().build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to make remote HTTP Get request.
|
||||
* @param getRequest - HttpGet request object constructed by caller.
|
||||
*
|
||||
* @param urls - Service urls to be used, if one fails try another.
|
||||
* @param path - URL endpoint for the resource.
|
||||
* @param queryParams - list of query parameters
|
||||
* @param httpMethod - http Method to be used.
|
||||
* @return Http Response body returned as a string. The caller
|
||||
* is expected to semantically understand the response.
|
||||
* @throws WasbRemoteCallException
|
||||
* @throws IOException
|
||||
* is expected to semantically understand the response.
|
||||
* @throws IOException when there an error in executing the remote http request.
|
||||
*/
|
||||
public String makeRemoteGetRequest(HttpGet getRequest)
|
||||
throws WasbRemoteCallException, IOException {
|
||||
public String makeRemoteRequest(String[] urls, String path,
|
||||
List<NameValuePair> queryParams, String httpMethod) throws IOException {
|
||||
|
||||
try {
|
||||
return retryableRequest(urls, path, queryParams, httpMethod);
|
||||
}
|
||||
|
||||
final String APPLICATION_JSON = "application/json";
|
||||
final int MAX_CONTENT_LENGTH = 1024;
|
||||
protected String retryableRequest(String[] urls, String path,
|
||||
List<NameValuePair> queryParams, String httpMethod) throws IOException {
|
||||
HttpResponse response = null;
|
||||
HttpUriRequest httpRequest = null;
|
||||
|
||||
getRequest.setHeader("Accept", APPLICATION_JSON);
|
||||
|
||||
HttpResponse response = client.execute(getRequest);
|
||||
|
||||
StatusLine statusLine = response.getStatusLine();
|
||||
if (statusLine == null || statusLine.getStatusCode() != HttpStatus.SC_OK) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
((statusLine!=null) ? statusLine.toString() : "NULL")
|
||||
);
|
||||
}
|
||||
|
||||
Header contentTypeHeader = response.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null
|
||||
|| !APPLICATION_JSON.equals(contentTypeHeader.getValue())) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Content-Type mismatch: expected: " + APPLICATION_JSON +
|
||||
", got " + ((contentTypeHeader!=null) ? contentTypeHeader.getValue() : "NULL")
|
||||
);
|
||||
}
|
||||
|
||||
Header contentLengthHeader = response.getFirstHeader("Content-Length");
|
||||
if (contentLengthHeader == null) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Content-Length header missing"
|
||||
);
|
||||
for (int retry = 0, index =
|
||||
random.nextInt(urls.length);; retry++, index++) {
|
||||
if (index >= urls.length) {
|
||||
index = index % urls.length;
|
||||
}
|
||||
|
||||
try {
|
||||
if (Integer.parseInt(contentLengthHeader.getValue()) > MAX_CONTENT_LENGTH) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Content-Length:" + contentLengthHeader.getValue() +
|
||||
"exceeded max:" + MAX_CONTENT_LENGTH
|
||||
);
|
||||
httpRequest =
|
||||
getHttpRequest(urls, path, queryParams, index, httpMethod);
|
||||
|
||||
httpRequest.setHeader("Accept", APPLICATION_JSON);
|
||||
response = client.execute(httpRequest);
|
||||
StatusLine statusLine = response.getStatusLine();
|
||||
if (statusLine == null
|
||||
|| statusLine.getStatusCode() != HttpStatus.SC_OK) {
|
||||
throw new WasbRemoteCallException(
|
||||
httpRequest.getURI().toString() + ":" + ((statusLine != null)
|
||||
? statusLine.toString()
|
||||
: "NULL"));
|
||||
}
|
||||
|
||||
Header contentTypeHeader = response.getFirstHeader("Content-Type");
|
||||
if (contentTypeHeader == null || !APPLICATION_JSON
|
||||
.equals(contentTypeHeader.getValue())) {
|
||||
throw new WasbRemoteCallException(
|
||||
httpRequest.getURI().toString() + ":"
|
||||
+ "Content-Type mismatch: expected: " + APPLICATION_JSON
|
||||
+ ", got " + ((contentTypeHeader != null) ? contentTypeHeader
|
||||
.getValue() : "NULL"));
|
||||
}
|
||||
|
||||
Header contentLengthHeader = response.getFirstHeader("Content-Length");
|
||||
if (contentLengthHeader == null) {
|
||||
throw new WasbRemoteCallException(
|
||||
httpRequest.getURI().toString() + ":"
|
||||
+ "Content-Length header missing");
|
||||
}
|
||||
|
||||
try {
|
||||
if (Integer.parseInt(contentLengthHeader.getValue())
|
||||
> MAX_CONTENT_LENGTH) {
|
||||
throw new WasbRemoteCallException(
|
||||
httpRequest.getURI().toString() + ":" + "Content-Length:"
|
||||
+ contentLengthHeader.getValue() + "exceeded max:"
|
||||
+ MAX_CONTENT_LENGTH);
|
||||
}
|
||||
} catch (NumberFormatException nfe) {
|
||||
throw new WasbRemoteCallException(
|
||||
httpRequest.getURI().toString() + ":"
|
||||
+ "Invalid Content-Length value :" + contentLengthHeader
|
||||
.getValue());
|
||||
}
|
||||
|
||||
BufferedReader rd = null;
|
||||
StringBuilder responseBody = new StringBuilder();
|
||||
try {
|
||||
rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent(),
|
||||
StandardCharsets.UTF_8));
|
||||
String responseLine = "";
|
||||
while ((responseLine = rd.readLine()) != null) {
|
||||
responseBody.append(responseLine);
|
||||
}
|
||||
} finally {
|
||||
rd.close();
|
||||
}
|
||||
return responseBody.toString();
|
||||
} catch (URISyntaxException uriSyntaxEx) {
|
||||
throw new WasbRemoteCallException("Encountered URISyntaxException "
|
||||
+ "while building the HttpGetRequest to remote service",
|
||||
uriSyntaxEx);
|
||||
} catch (IOException e) {
|
||||
LOG.debug(e.getMessage(), e);
|
||||
try {
|
||||
shouldRetry(e, retry, (httpRequest != null)
|
||||
? httpRequest.getURI().toString()
|
||||
: urls[index]);
|
||||
} catch (IOException ioex) {
|
||||
String message =
|
||||
"Encountered error while making remote call to " + String
|
||||
.join(",", urls) + " retried " + retry + " time(s).";
|
||||
LOG.error(message, ioex);
|
||||
throw new WasbRemoteCallException(message, ioex);
|
||||
}
|
||||
}
|
||||
catch (NumberFormatException nfe) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Invalid Content-Length value :" + contentLengthHeader.getValue()
|
||||
);
|
||||
}
|
||||
|
||||
BufferedReader rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent(),
|
||||
StandardCharsets.UTF_8));
|
||||
StringBuilder responseBody = new StringBuilder();
|
||||
String responseLine = "";
|
||||
while ((responseLine = rd.readLine()) != null) {
|
||||
responseBody.append(responseLine);
|
||||
}
|
||||
rd.close();
|
||||
return responseBody.toString();
|
||||
|
||||
} catch (ClientProtocolException clientProtocolEx) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Encountered ClientProtocolException while making remote call", clientProtocolEx);
|
||||
} catch (IOException ioEx) {
|
||||
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
|
||||
"Encountered IOException while making remote call", ioEx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected HttpUriRequest getHttpRequest(String[] urls, String path,
|
||||
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
|
||||
throws URISyntaxException, IOException {
|
||||
URIBuilder uriBuilder = null;
|
||||
uriBuilder =
|
||||
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
||||
HttpUriRequest httpUriRequest = null;
|
||||
switch (httpMethod) {
|
||||
case HttpPut.METHOD_NAME:
|
||||
httpUriRequest = new HttpPut(uriBuilder.build());
|
||||
break;
|
||||
case HttpPost.METHOD_NAME:
|
||||
httpUriRequest = new HttpPost(uriBuilder.build());
|
||||
break;
|
||||
default:
|
||||
httpUriRequest = new HttpGet(uriBuilder.build());
|
||||
break;
|
||||
}
|
||||
return httpUriRequest;
|
||||
}
|
||||
|
||||
private void shouldRetry(final IOException ioe, final int retry,
|
||||
final String url) throws IOException {
|
||||
CharSequence authenticationExceptionMessage =
|
||||
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE;
|
||||
if (ioe instanceof WasbRemoteCallException && ioe.getMessage()
|
||||
.equals(authenticationExceptionMessage)) {
|
||||
throw ioe;
|
||||
}
|
||||
try {
|
||||
final RetryPolicy.RetryAction a = (retryPolicy != null)
|
||||
? retryPolicy
|
||||
.shouldRetry(ioe, retry, 0, true)
|
||||
: RetryPolicy.RetryAction.FAIL;
|
||||
|
||||
boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
|
||||
boolean isFailoverAndRetry =
|
||||
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
|
||||
|
||||
if (isRetry || isFailoverAndRetry) {
|
||||
LOG.debug("Retrying connect to Remote service:{}. Already tried {}"
|
||||
+ " time(s); retry policy is {}, " + "delay {}ms.", url, retry,
|
||||
retryPolicy, a.delayMillis);
|
||||
|
||||
Thread.sleep(a.delayMillis);
|
||||
return;
|
||||
}
|
||||
} catch(InterruptedIOException e) {
|
||||
LOG.warn(e.getMessage(), e);
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Original exception is ", ioe);
|
||||
throw new WasbRemoteCallException(e.getMessage(), e);
|
||||
}
|
||||
LOG.debug("Not retrying anymore, already retried the urls {} time(s)",
|
||||
retry);
|
||||
throw new WasbRemoteCallException(
|
||||
url + ":" + "Encountered IOException while making remote call", ioe);
|
||||
}
|
||||
}
|
||||
|
@ -26,21 +26,6 @@ public final class Constants {
|
||||
private Constants() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration parameter name expected in the Configuration
|
||||
* object to provide the url of the remote service {@value}
|
||||
*/
|
||||
public static final String KEY_CRED_SERVICE_URL = "fs.azure.cred.service.url";
|
||||
/**
|
||||
* Default port of the remote service used as delegation token manager and Azure storage SAS key generator.
|
||||
*/
|
||||
public static final int DEFAULT_CRED_SERVICE_PORT = 50911;
|
||||
|
||||
/**
|
||||
* Default remote delegation token manager endpoint.
|
||||
*/
|
||||
public static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT = "/tokenmanager/v1";
|
||||
|
||||
/**
|
||||
* The configuration property to enable Kerberos support.
|
||||
*/
|
||||
@ -51,4 +36,9 @@ private Constants() {
|
||||
* Parameter to be used for impersonation.
|
||||
*/
|
||||
public static final String DOAS_PARAM = "doas";
|
||||
|
||||
/**
|
||||
* Error message for Authentication failures.
|
||||
*/
|
||||
public static final String AUTHENTICATION_FAILED_ERROR_MESSAGE = "Authentication Failed ";
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure.security;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Utility class to parse JSON.
|
||||
*/
|
||||
public final class JsonUtils {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
|
||||
|
||||
private JsonUtils() {
|
||||
}
|
||||
|
||||
public static Map<?, ?> parse(final String jsonString) throws IOException {
|
||||
try {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.readerFor(Map.class).readValue(jsonString);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("JSON Parsing exception: {} while parsing {}", e.getMessage(),
|
||||
jsonString);
|
||||
if (jsonString.toLowerCase(Locale.ENGLISH).contains("server error")) {
|
||||
LOG.error(
|
||||
"Internal Server Error was encountered while making a request");
|
||||
}
|
||||
throw new IOException("JSON Parsing Error: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,162 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure.security;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azure.SecureWasbRemoteCallHelper;
|
||||
import org.apache.hadoop.fs.azure.WasbRemoteCallHelper;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Class to manage delegation token operations by making rest call to remote service.
|
||||
*/
|
||||
public class RemoteWasbDelegationTokenManager
|
||||
implements WasbDelegationTokenManager {
|
||||
|
||||
/**
|
||||
* Configuration parameter name expected in the configuration
|
||||
* object to provide the url of the delegation token service to fetch the delegation tokens.
|
||||
*/
|
||||
public static final String KEY_DELEGATION_TOKEN_SERVICE_URLS =
|
||||
"fs.azure.delegation.token.service.urls";
|
||||
/**
|
||||
* Configuration key to enable http retry policy for delegation token service calls.
|
||||
*/
|
||||
public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
|
||||
"fs.azure.delegationtokenservice.http.retry.policy.enabled";
|
||||
/**
|
||||
* Configuration key for delegation token service http retry policy spec.
|
||||
*/
|
||||
public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
|
||||
"fs.azure.delegationtokenservice.http.retry.policy.spec";
|
||||
/**
|
||||
* Default remote delegation token manager endpoint.
|
||||
*/
|
||||
private static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT =
|
||||
"/tokenmanager/v1";
|
||||
/**
|
||||
* Default for delegation token service http retry policy spec.
|
||||
*/
|
||||
private static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||
"1000,3,10000,2";
|
||||
|
||||
private static final boolean
|
||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = true;
|
||||
|
||||
private static final Text WASB_DT_SERVICE_NAME = new Text("WASB_DT_SERVICE");
|
||||
/**
|
||||
* Query parameter value for Getting delegation token http request
|
||||
*/
|
||||
private static final String GET_DELEGATION_TOKEN_OP = "GETDELEGATIONTOKEN";
|
||||
/**
|
||||
* Query parameter value for renewing delegation token http request
|
||||
*/
|
||||
private static final String RENEW_DELEGATION_TOKEN_OP = "RENEWDELEGATIONTOKEN";
|
||||
/**
|
||||
* Query parameter value for canceling the delegation token http request
|
||||
*/
|
||||
private static final String CANCEL_DELEGATION_TOKEN_OP = "CANCELDELEGATIONTOKEN";
|
||||
/**
|
||||
* op parameter to represent the operation.
|
||||
*/
|
||||
private static final String OP_PARAM_KEY_NAME = "op";
|
||||
/**
|
||||
* renewer parameter to represent the renewer of the delegation token.
|
||||
*/
|
||||
private static final String RENEWER_PARAM_KEY_NAME = "renewer";
|
||||
/**
|
||||
* service parameter to represent the service which returns delegation tokens.
|
||||
*/
|
||||
private static final String SERVICE_PARAM_KEY_NAME = "service";
|
||||
/**
|
||||
* token parameter to represent the delegation token.
|
||||
*/
|
||||
private static final String TOKEN_PARAM_KEY_NAME = "token";
|
||||
private WasbRemoteCallHelper remoteCallHelper;
|
||||
private String[] dtServiceUrls;
|
||||
|
||||
public RemoteWasbDelegationTokenManager(Configuration conf)
|
||||
throws IOException {
|
||||
RetryPolicy retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
|
||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
|
||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||
|
||||
remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, true);
|
||||
this.dtServiceUrls =
|
||||
conf.getTrimmedStrings(KEY_DELEGATION_TOKEN_SERVICE_URLS);
|
||||
if (this.dtServiceUrls == null || this.dtServiceUrls.length <= 0) {
|
||||
throw new IOException(
|
||||
KEY_DELEGATION_TOKEN_SERVICE_URLS + " config not set"
|
||||
+ " in configuration.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(
|
||||
String renewer) throws IOException {
|
||||
URIBuilder uriBuilder =
|
||||
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
|
||||
.addParameter(OP_PARAM_KEY_NAME, GET_DELEGATION_TOKEN_OP)
|
||||
.addParameter(RENEWER_PARAM_KEY_NAME, renewer)
|
||||
.addParameter(SERVICE_PARAM_KEY_NAME, WASB_DT_SERVICE_NAME.toString());
|
||||
String responseBody = remoteCallHelper
|
||||
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
|
||||
return TokenUtils.toDelegationToken(JsonUtils.parse(responseBody));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(Token<?> token)
|
||||
throws IOException {
|
||||
URIBuilder uriBuilder =
|
||||
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
|
||||
.addParameter(OP_PARAM_KEY_NAME, RENEW_DELEGATION_TOKEN_OP)
|
||||
.addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
|
||||
|
||||
String responseBody = remoteCallHelper
|
||||
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
|
||||
|
||||
Map<?, ?> parsedResp = JsonUtils.parse(responseBody);
|
||||
return ((Number) parsedResp.get("long")).longValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<?> token)
|
||||
throws IOException {
|
||||
URIBuilder uriBuilder =
|
||||
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
|
||||
.addParameter(OP_PARAM_KEY_NAME, CANCEL_DELEGATION_TOKEN_OP)
|
||||
.addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
|
||||
remoteCallHelper.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
|
||||
uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
|
||||
}
|
||||
}
|
@ -1,86 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure.security;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azure.RemoteWasbAuthorizerImpl;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Security Utils class for WASB.
|
||||
*/
|
||||
public final class SecurityUtils {
|
||||
|
||||
private SecurityUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to get remote service URLs from the configuration.
|
||||
* @param conf configuration object.
|
||||
* @return remote service URL
|
||||
* @throws UnknownHostException thrown when getting the default value.
|
||||
*/
|
||||
public static String getCredServiceUrls(Configuration conf)
|
||||
throws UnknownHostException {
|
||||
return conf.get(Constants.KEY_CRED_SERVICE_URL, String
|
||||
.format("http://%s:%s",
|
||||
InetAddress.getLocalHost().getCanonicalHostName(),
|
||||
Constants.DEFAULT_CRED_SERVICE_PORT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to get remote Authorization service URLs from the configuration.
|
||||
* @param conf Configuration object.
|
||||
* @return remote Authorization server URL
|
||||
* @throws UnknownHostException thrown when getting the default value.
|
||||
*/
|
||||
public static String getRemoteAuthServiceUrls(Configuration conf)
|
||||
throws UnknownHostException {
|
||||
return conf.get(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, String
|
||||
.format("http://%s:%s",
|
||||
InetAddress.getLocalHost().getCanonicalHostName(),
|
||||
Constants.DEFAULT_CRED_SERVICE_PORT));
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to get delegation token from the UGI credentials.
|
||||
* @return delegation token
|
||||
* @throws IOException thrown when getting the current user.
|
||||
*/
|
||||
public static String getDelegationTokenFromCredentials() throws IOException {
|
||||
String delegationToken = null;
|
||||
Iterator<Token<? extends TokenIdentifier>> tokenIterator = UserGroupInformation
|
||||
.getCurrentUser().getCredentials().getAllTokens().iterator();
|
||||
while (tokenIterator.hasNext()) {
|
||||
Token<? extends TokenIdentifier> iteratedToken = tokenIterator.next();
|
||||
if (iteratedToken.getKind()
|
||||
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
|
||||
delegationToken = iteratedToken.encodeToUrlString();
|
||||
}
|
||||
}
|
||||
return delegationToken;
|
||||
}
|
||||
}
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure.security;
|
||||
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Utility methods common for token management
|
||||
*/
|
||||
public final class TokenUtils {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(TokenUtils.class);
|
||||
public static final String URL_STRING = "urlString";
|
||||
|
||||
private TokenUtils() {
|
||||
}
|
||||
|
||||
public static Token<DelegationTokenIdentifier> toDelegationToken(
|
||||
final Map<?, ?> inputMap) throws IOException {
|
||||
final Map<?, ?> m = (Map<?, ?>) inputMap.get(Token.class.getSimpleName());
|
||||
return (Token<DelegationTokenIdentifier>) toToken(m);
|
||||
}
|
||||
|
||||
public static Token<? extends TokenIdentifier> toToken(final Map<?, ?> m)
|
||||
throws IOException {
|
||||
if (m == null) {
|
||||
return null;
|
||||
}
|
||||
String urlString = (String) m.get(URL_STRING);
|
||||
if (urlString != null) {
|
||||
final Token<DelegationTokenIdentifier> token = new Token<>();
|
||||
LOG.debug("Read url string param - {}", urlString);
|
||||
token.decodeFromUrlString(urlString);
|
||||
return token;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,54 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure.security;
|
||||
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Interface for Managing the Delegation tokens.
|
||||
*/
|
||||
public interface WasbDelegationTokenManager {
|
||||
|
||||
/**
|
||||
* Get Delegation token
|
||||
* @param renewer delegation token renewer
|
||||
* @return delegation token
|
||||
* @throws IOException when error in getting the delegation token
|
||||
*/
|
||||
Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Renew the delegation token
|
||||
* @param token delegation token.
|
||||
* @return renewed time.
|
||||
* @throws IOException when error in renewing the delegation token
|
||||
*/
|
||||
long renewDelegationToken(Token<?> token) throws IOException;
|
||||
|
||||
/**
|
||||
* Cancel the delegation token
|
||||
* @param token delegation token.
|
||||
* @throws IOException when error in cancelling the delegation token.
|
||||
*/
|
||||
void cancelDelegationToken(Token<?> token) throws IOException;
|
||||
}
|
@ -6,9 +6,9 @@
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -20,27 +20,19 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
/**
|
||||
* Token Renewer for renewing WASB delegation tokens with remote service.
|
||||
*/
|
||||
public class WasbTokenRenewer extends TokenRenewer {
|
||||
public static final Logger LOG = LoggerFactory
|
||||
.getLogger(WasbTokenRenewer.class);
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(WasbTokenRenewer.class);
|
||||
|
||||
/**
|
||||
* Checks if this particular object handles the Kind of token passed.
|
||||
@ -75,32 +67,7 @@ public boolean isManaged(Token<?> token) throws IOException {
|
||||
public long renew(final Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
LOG.debug("Renewing the delegation token");
|
||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
final UserGroupInformation proxyUser = connectUgi;
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
}
|
||||
connectUgi.checkTGTAndReloginFromKeytab();
|
||||
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
|
||||
authToken
|
||||
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
|
||||
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
|
||||
String.format("http://%s:%s",
|
||||
InetAddress.getLocalHost().getCanonicalHostName(),
|
||||
Constants.DEFAULT_CRED_SERVICE_PORT));
|
||||
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
|
||||
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
|
||||
authenticator);
|
||||
|
||||
return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
|
||||
@Override
|
||||
public Long run() throws Exception {
|
||||
return authURL.renewDelegationToken(new URL(credServiceUrl
|
||||
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
|
||||
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
|
||||
}
|
||||
});
|
||||
return getInstance(conf).renewDelegationToken(token);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,31 +81,11 @@ public Long run() throws Exception {
|
||||
public void cancel(final Token<?> token, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
LOG.debug("Cancelling the delegation token");
|
||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||
final UserGroupInformation proxyUser = connectUgi;
|
||||
if (connectUgi == null) {
|
||||
connectUgi = ugi;
|
||||
}
|
||||
connectUgi.checkTGTAndReloginFromKeytab();
|
||||
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
|
||||
authToken
|
||||
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
|
||||
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
|
||||
String.format("http://%s:%s",
|
||||
InetAddress.getLocalHost().getCanonicalHostName(),
|
||||
Constants.DEFAULT_CRED_SERVICE_PORT));
|
||||
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
|
||||
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
|
||||
authenticator);
|
||||
connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
authURL.cancelDelegationToken(new URL(credServiceUrl
|
||||
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
|
||||
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
getInstance(conf).cancelDelegationToken(token);
|
||||
}
|
||||
|
||||
private WasbDelegationTokenManager getInstance(Configuration conf)
|
||||
throws IOException {
|
||||
return new RemoteWasbDelegationTokenManager(conf);
|
||||
}
|
||||
}
|
@ -316,12 +316,12 @@ To enable SAS key generation locally following property needs to be set to true.
|
||||
</property>
|
||||
```
|
||||
|
||||
To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
|
||||
To use the remote SAS key generation mode, comma separated external REST services are expected to provided required SAS keys.
|
||||
Following property can used to provide the end point to use for remote SAS Key generation:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.cred.service.url</name>
|
||||
<name>fs.azure.cred.service.urls</name>
|
||||
<value>{URL}</value>
|
||||
</property>
|
||||
```
|
||||
@ -354,11 +354,11 @@ Authorization support can be enabled in WASB using the following configuration:
|
||||
```
|
||||
|
||||
The current implementation of authorization relies on the presence of an external service that can enforce
|
||||
the authorization. The service is expected to be running on a URL provided by the following config.
|
||||
the authorization. The service is expected to be running on comma separated URLs provided by the following config.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.authorization.remote.service.url</name>
|
||||
<name>fs.azure.authorization.remote.service.urls</name>
|
||||
<value>{URL}</value>
|
||||
</property>
|
||||
```
|
||||
@ -377,6 +377,42 @@ The service is expected to return a response in JSON format:
|
||||
}
|
||||
```
|
||||
|
||||
### Delegation token support in WASB
|
||||
|
||||
Delegation token support support can be enabled in WASB using the following configuration:
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.enable.kerberos.support</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
The current implementation of delegation token implementation relies on the presence of an external service instances that can generate and manage delegation tokens. The service is expected to be running on comma separated URLs provided by the following config.
|
||||
|
||||
```xml
|
||||
<property>
|
||||
<name>fs.azure.delegation.token.service.urls</name>
|
||||
<value>{URL}</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
The remote service is expected to provide support for the following REST call: ```{URL}?op=GETDELEGATIONTOKEN```, ```{URL}?op=RENEWDELEGATIONTOKEN``` and ```{URL}?op=CANCELDELEGATIONTOKEN```
|
||||
An example request:
|
||||
```{URL}?op=GETDELEGATIONTOKEN&renewer=<renewer>```
|
||||
```{URL}?op=RENEWDELEGATIONTOKEN&token=<delegation token>```
|
||||
```{URL}?op=CANCELDELEGATIONTOKEN&token=<delegation token>```
|
||||
|
||||
The service is expected to return a response in JSON format for GETDELEGATIONTOKEN request:
|
||||
|
||||
```json
|
||||
{
|
||||
"Token" : {
|
||||
"urlString": URL string of delegation token.
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Testing the hadoop-azure Module
|
||||
|
||||
The hadoop-azure module includes a full suite of unit tests. Most of the tests
|
||||
|
@ -46,7 +46,7 @@ public class TestNativeAzureFileSystemAuthorization
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
|
||||
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
|
||||
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost/");
|
||||
return AzureBlobStorageTestAccount.create(conf);
|
||||
}
|
||||
|
||||
|
@ -21,34 +21,48 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.http.*;
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.http.Header;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.ProtocolVersion;
|
||||
import org.apache.http.ParseException;
|
||||
import org.apache.http.HeaderElement;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.hamcrest.Description;
|
||||
import org.hamcrest.TypeSafeMatcher;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Mockito.atLeast;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
||||
/**
|
||||
* Test class to hold all WasbRemoteCallHelper tests
|
||||
*/
|
||||
public class TestWasbRemoteCallHelper
|
||||
extends AbstractWasbTestBase {
|
||||
public static final String EMPTY_STRING = "";
|
||||
private static final int INVALID_HTTP_STATUS_CODE_999 = 999;
|
||||
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
|
||||
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
|
||||
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/");
|
||||
return AzureBlobStorageTestAccount.create(conf);
|
||||
}
|
||||
|
||||
@ -80,7 +94,7 @@ public void testInvalidStatusCode() throws Throwable {
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(999));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(INVALID_HTTP_STATUS_CODE_999));
|
||||
// finished setting up mocks
|
||||
|
||||
performop(mockHttpClient);
|
||||
@ -99,7 +113,7 @@ public void testInvalidContentType() throws Throwable {
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "text/plain"));
|
||||
// finished setting up mocks
|
||||
@ -120,7 +134,7 @@ public void testMissingContentLength() throws Throwable {
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
// finished setting up mocks
|
||||
@ -141,7 +155,7 @@ public void testContentLengthExceedsMax() throws Throwable {
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
|
||||
@ -164,7 +178,7 @@ public void testInvalidContentLengthValue() throws Throwable {
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
|
||||
@ -188,7 +202,7 @@ public void testValidJSONResponse() throws Throwable {
|
||||
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
|
||||
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
|
||||
@ -220,7 +234,7 @@ public void testMalFormedJSONResponse() throws Throwable {
|
||||
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
|
||||
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
|
||||
@ -250,7 +264,7 @@ public void testFailureCodeJSONResponse() throws Throwable {
|
||||
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
|
||||
|
||||
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
|
||||
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
|
||||
@ -263,17 +277,155 @@ public void testFailureCodeJSONResponse() throws Throwable {
|
||||
performop(mockHttpClient);
|
||||
}
|
||||
|
||||
private void setupExpectations() throws UnsupportedEncodingException {
|
||||
@Test
|
||||
public void testWhenOneInstanceIsDown() throws Throwable {
|
||||
|
||||
String path = new Path("/").makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
|
||||
String pathEncoded = URLEncoder.encode(path, "UTF-8");
|
||||
// set up mocks
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
|
||||
|
||||
String requestURI = String.format("http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%s&operation_type=write", pathEncoded);
|
||||
HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpResponseService1.getStatusLine())
|
||||
.thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
|
||||
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
|
||||
.thenReturn(newHeader("Content-Length", "1024"));
|
||||
Mockito.when(mockHttpResponseService1.getEntity())
|
||||
.thenReturn(mockHttpEntity);
|
||||
|
||||
HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpResponseService2.getStatusLine())
|
||||
.thenReturn(newStatusLine(HttpStatus.SC_OK));
|
||||
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
|
||||
.thenReturn(newHeader("Content-Length", "1024"));
|
||||
Mockito.when(mockHttpResponseService2.getEntity())
|
||||
.thenReturn(mockHttpEntity);
|
||||
|
||||
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
||||
@Override public boolean matches(Object o) {
|
||||
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
||||
}
|
||||
}
|
||||
class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
|
||||
@Override public boolean matches(Object o) {
|
||||
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
||||
}
|
||||
}
|
||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
||||
.thenReturn(mockHttpResponseService1);
|
||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
||||
.thenReturn(mockHttpResponseService2);
|
||||
|
||||
//Need 3 times because performop() does 3 fs operations.
|
||||
Mockito.when(mockHttpEntity.getContent())
|
||||
.thenReturn(new ByteArrayInputStream(validJsonResponse()
|
||||
.getBytes(StandardCharsets.UTF_8)))
|
||||
.thenReturn(new ByteArrayInputStream(validJsonResponse()
|
||||
.getBytes(StandardCharsets.UTF_8)))
|
||||
.thenReturn(new ByteArrayInputStream(validJsonResponse()
|
||||
.getBytes(StandardCharsets.UTF_8)));
|
||||
// finished setting up mocks
|
||||
|
||||
performop(mockHttpClient);
|
||||
|
||||
Mockito.verify(mockHttpClient, times(3)).execute(Mockito.argThat(new HttpGetForService2()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenServiceInstancesAreDown() throws Throwable {
|
||||
//expectedEx.expect(WasbAuthorizationException.class);
|
||||
// set up mocks
|
||||
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
|
||||
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
|
||||
|
||||
HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpResponseService1.getStatusLine())
|
||||
.thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
|
||||
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
|
||||
.thenReturn(newHeader("Content-Length", "1024"));
|
||||
Mockito.when(mockHttpResponseService1.getEntity())
|
||||
.thenReturn(mockHttpEntity);
|
||||
|
||||
HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
|
||||
Mockito.when(mockHttpResponseService2.getStatusLine())
|
||||
.thenReturn(newStatusLine(
|
||||
HttpStatus.SC_INTERNAL_SERVER_ERROR));
|
||||
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
|
||||
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
|
||||
.thenReturn(newHeader("Content-Length", "1024"));
|
||||
Mockito.when(mockHttpResponseService2.getEntity())
|
||||
.thenReturn(mockHttpEntity);
|
||||
|
||||
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
||||
@Override public boolean matches(Object o) {
|
||||
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
||||
}
|
||||
}
|
||||
class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
|
||||
@Override public boolean matches(Object o) {
|
||||
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
||||
}
|
||||
}
|
||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
||||
.thenReturn(mockHttpResponseService1);
|
||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
||||
.thenReturn(mockHttpResponseService2);
|
||||
|
||||
//Need 3 times because performop() does 3 fs operations.
|
||||
Mockito.when(mockHttpEntity.getContent())
|
||||
.thenReturn(new ByteArrayInputStream(
|
||||
validJsonResponse().getBytes(StandardCharsets.UTF_8)))
|
||||
.thenReturn(new ByteArrayInputStream(
|
||||
validJsonResponse().getBytes(StandardCharsets.UTF_8)))
|
||||
.thenReturn(new ByteArrayInputStream(
|
||||
validJsonResponse().getBytes(StandardCharsets.UTF_8)));
|
||||
// finished setting up mocks
|
||||
try {
|
||||
performop(mockHttpClient);
|
||||
}catch (WasbAuthorizationException e){
|
||||
e.printStackTrace();
|
||||
Mockito.verify(mockHttpClient, atLeast(3))
|
||||
.execute(argThat(new HttpGetForService1()));
|
||||
Mockito.verify(mockHttpClient, atLeast(3))
|
||||
.execute(argThat(new HttpGetForService2()));
|
||||
Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
|
||||
}
|
||||
}
|
||||
|
||||
private void setupExpectations() {
|
||||
expectedEx.expect(WasbAuthorizationException.class);
|
||||
expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: "
|
||||
+ requestURI
|
||||
+ ":Encountered IOException while making remote call"
|
||||
);
|
||||
|
||||
class MatchesPattern extends TypeSafeMatcher<String> {
|
||||
private String pattern;
|
||||
|
||||
MatchesPattern(String pattern) {
|
||||
this.pattern = pattern;
|
||||
}
|
||||
|
||||
@Override protected boolean matchesSafely(String item) {
|
||||
return item.matches(pattern);
|
||||
}
|
||||
|
||||
@Override public void describeTo(Description description) {
|
||||
description.appendText("matches pattern ").appendValue(pattern);
|
||||
}
|
||||
|
||||
@Override protected void describeMismatchSafely(String item,
|
||||
Description mismatchDescription) {
|
||||
mismatchDescription.appendText("does not match");
|
||||
}
|
||||
}
|
||||
|
||||
expectedEx.expectMessage(new MatchesPattern(
|
||||
"org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
|
||||
+ "Encountered error while making remote call to "
|
||||
+ "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/ retried 6 time\\(s\\)\\."));
|
||||
}
|
||||
|
||||
private void performop(HttpClient mockHttpClient) throws Throwable {
|
||||
@ -282,7 +434,10 @@ private void performop(HttpClient mockHttpClient) throws Throwable {
|
||||
|
||||
RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
|
||||
authorizer.init(fs.getConf());
|
||||
WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper();
|
||||
WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper(
|
||||
RetryUtils.getMultipleLinearRandomRetry(new Configuration(),
|
||||
EMPTY_STRING, true,
|
||||
EMPTY_STRING, "1000,3,10000,2"));
|
||||
mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
|
||||
authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
|
||||
fs.updateWasbAuthorizer(authorizer);
|
||||
@ -293,21 +448,26 @@ private void performop(HttpClient mockHttpClient) throws Throwable {
|
||||
}
|
||||
|
||||
private String validJsonResponse() {
|
||||
return new String(
|
||||
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\": \"Authorized\"}"
|
||||
);
|
||||
return "{"
|
||||
+ "\"responseCode\": 0,"
|
||||
+ "\"authorizationResult\": true,"
|
||||
+ "\"responseMessage\": \"Authorized\""
|
||||
+ "}";
|
||||
}
|
||||
|
||||
private String malformedJsonResponse() {
|
||||
return new String(
|
||||
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\":"
|
||||
);
|
||||
return "{"
|
||||
+ "\"responseCode\": 0,"
|
||||
+ "\"authorizationResult\": true,"
|
||||
+ "\"responseMessage\":";
|
||||
}
|
||||
|
||||
private String failureCodeJsonResponse() {
|
||||
return new String(
|
||||
"{\"responseCode\": 1, \"authorizationResult\": false, \"responseMessage\": \"Unauthorized\"}"
|
||||
);
|
||||
return "{"
|
||||
+ "\"responseCode\": 1,"
|
||||
+ "\"authorizationResult\": false,"
|
||||
+ "\"responseMessage\": \"Unauthorized\""
|
||||
+ "}";
|
||||
}
|
||||
|
||||
private StatusLine newStatusLine(int statusCode) {
|
||||
@ -347,4 +507,10 @@ public HeaderElement[] getElements() throws ParseException {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/** Check that a HttpGet request is with given remote host. */
|
||||
private static boolean checkHttpGetMatchHost(HttpGet g, String h) {
|
||||
return g != null && g.getURI().getHost().equals(h);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user