HDFS-8155. Support OAuth2 in WebHDFS.
This commit is contained in:
parent
6ab2d19f5c
commit
837fb75e8e
@ -31,6 +31,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.squareup.okhttp</groupId>
|
||||
<artifactId>okhttp</artifactId>
|
||||
<version>2.4.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
|
@ -36,6 +36,14 @@ public interface HdfsClientConfigKeys {
|
||||
String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
|
||||
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
|
||||
|
||||
String DFS_WEBHDFS_OAUTH_ENABLED_KEY = "dfs.webhdfs.oauth2.enabled";
|
||||
boolean DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT = false;
|
||||
|
||||
String OAUTH_CLIENT_ID_KEY = "dfs.webhdfs.oauth2.client.id";
|
||||
String OAUTH_REFRESH_URL_KEY = "dfs.webhdfs.oauth2.refresh.url";
|
||||
|
||||
String ACCESS_TOKEN_PROVIDER_KEY = "dfs.webhdfs.oauth2.access.token.provider";
|
||||
|
||||
String PREFIX = "dfs.client.";
|
||||
String DFS_NAMESERVICES = "dfs.nameservices";
|
||||
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
@ -77,15 +78,42 @@ public HttpURLConnection configure(HttpURLConnection conn)
|
||||
* try to load SSL certificates when it is specified.
|
||||
*/
|
||||
public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
|
||||
ConnectionConfigurator conn = getSSLConnectionConfiguration(conf);
|
||||
|
||||
return new URLConnectionFactory(conn);
|
||||
}
|
||||
|
||||
private static ConnectionConfigurator
|
||||
getSSLConnectionConfiguration(Configuration conf) {
|
||||
ConnectionConfigurator conn = null;
|
||||
try {
|
||||
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
|
||||
} catch (Exception e) {
|
||||
LOG.debug(
|
||||
"Cannot load customized ssl related configuration. Fallback to system-generic settings.",
|
||||
"Cannot load customized ssl related configuration. Fallback to" +
|
||||
" system-generic settings.",
|
||||
e);
|
||||
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a new URLConnectionFactory that supports OAut-based connections.
|
||||
* It will also try to load the SSL configuration when they are specified.
|
||||
*/
|
||||
public static URLConnectionFactory
|
||||
newOAuth2URLConnectionFactory(Configuration conf) throws IOException {
|
||||
ConnectionConfigurator conn = null;
|
||||
try {
|
||||
ConnectionConfigurator sslConnConfigurator
|
||||
= newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
|
||||
|
||||
conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to load OAuth2 connection factory.", e);
|
||||
}
|
||||
return new URLConnectionFactory(conn);
|
||||
}
|
||||
|
||||
|
@ -149,8 +149,19 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
|
||||
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
boolean isOAuth = conf.getBoolean(
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
|
||||
|
||||
if(isOAuth) {
|
||||
LOG.info("Enabling OAuth2 in WebHDFS");
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newOAuth2URLConnectionFactory(conf);
|
||||
} else {
|
||||
LOG.info("Not enabling OAuth2 in WebHDFS");
|
||||
connectionFactory = URLConnectionFactory
|
||||
.newDefaultURLConnectionFactory(conf);
|
||||
}
|
||||
|
||||
|
||||
ugi = UserGroupInformation.getCurrentUser();
|
||||
|
@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configurable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Provide an OAuth2 access token to be used to authenticate http calls in
|
||||
* WebHDFS.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class AccessTokenProvider implements Configurable {
|
||||
private Configuration conf;
|
||||
|
||||
/**
|
||||
* Obtain the access token that should be added to http connection's header.
|
||||
* Will be called for each connection, so implementations should be
|
||||
* performant. Implementations are responsible for any refreshing of
|
||||
* the token.
|
||||
*
|
||||
* @return Access token to be added to connection header.
|
||||
*/
|
||||
abstract String getAccessToken() throws IOException;
|
||||
|
||||
/**
|
||||
* Return the conf.
|
||||
*
|
||||
* @return the conf.
|
||||
*/
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the conf.
|
||||
*
|
||||
* @param configuration New configuration.
|
||||
*/
|
||||
@Override
|
||||
public void setConf(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
}
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
/**
|
||||
* Access tokens generally expire. This timer helps keep track of that.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class AccessTokenTimer {
|
||||
public static final long EXPIRE_BUFFER_MS = 30 * 1000L;
|
||||
|
||||
private final Timer timer;
|
||||
|
||||
/**
|
||||
* When the current access token will expire in milliseconds since
|
||||
* epoch.
|
||||
*/
|
||||
private long nextRefreshMSSinceEpoch;
|
||||
|
||||
public AccessTokenTimer() {
|
||||
this(new Timer());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param timer Timer instance for unit testing
|
||||
*/
|
||||
public AccessTokenTimer(Timer timer) {
|
||||
this.timer = timer;
|
||||
this.nextRefreshMSSinceEpoch = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set when the access token will expire as reported by the oauth server,
|
||||
* ie in seconds from now.
|
||||
* @param expiresIn Access time expiration as reported by OAuth server
|
||||
*/
|
||||
public void setExpiresIn(String expiresIn) {
|
||||
this.nextRefreshMSSinceEpoch = convertExpiresIn(timer, expiresIn);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set when the access token will expire in milliseconds from epoch,
|
||||
* as required by the WebHDFS configuration. This is a bit hacky and lame.
|
||||
*
|
||||
* @param expiresInMSSinceEpoch Access time expiration in ms since epoch.
|
||||
*/
|
||||
public void setExpiresInMSSinceEpoch(String expiresInMSSinceEpoch){
|
||||
this.nextRefreshMSSinceEpoch = Long.parseLong(expiresInMSSinceEpoch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next time we should refresh the token.
|
||||
*
|
||||
* @return Next time since epoch we'll need to refresh the token.
|
||||
*/
|
||||
public long getNextRefreshMSSinceEpoch() {
|
||||
return nextRefreshMSSinceEpoch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the current token has expired or will expire within the
|
||||
* EXPIRE_BUFFER_MS (to give ample wiggle room for the call to be made to
|
||||
* the server).
|
||||
*/
|
||||
public boolean shouldRefresh() {
|
||||
long lowerLimit = nextRefreshMSSinceEpoch - EXPIRE_BUFFER_MS;
|
||||
long currTime = timer.now();
|
||||
return currTime > lowerLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* The expires_in param from OAuth is in seconds-from-now. Convert to
|
||||
* milliseconds-from-epoch
|
||||
*/
|
||||
static Long convertExpiresIn(Timer timer, String expiresInSecs) {
|
||||
long expiresSecs = Long.parseLong(expiresInSecs);
|
||||
long expiresMs = expiresSecs * 1000;
|
||||
return timer.now() + expiresMs;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
|
||||
|
||||
/**
|
||||
* Obtain an access token via a a credential (provided through the
|
||||
* Configuration) using the
|
||||
* <a href="https://tools.ietf.org/html/rfc6749#section-4.4">
|
||||
* Client Credentials Grant workflow</a>.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ConfCredentialBasedAccessTokenProvider
|
||||
extends CredentialBasedAccessTokenProvider {
|
||||
private String credential;
|
||||
|
||||
public ConfCredentialBasedAccessTokenProvider() {
|
||||
}
|
||||
|
||||
public ConfCredentialBasedAccessTokenProvider(Timer timer) {
|
||||
super(timer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
credential = notNull(conf, OAUTH_CREDENTIAL_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCredential() {
|
||||
if(credential == null) {
|
||||
throw new IllegalArgumentException("Credential has not been " +
|
||||
"provided in configuration");
|
||||
}
|
||||
|
||||
return credential;
|
||||
}
|
||||
}
|
@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.RequestBody;
|
||||
import com.squareup.okhttp.Response;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
|
||||
|
||||
/**
|
||||
* Supply a access token obtained via a refresh token (provided through the
|
||||
* Configuration using the second half of the
|
||||
* <a href="https://tools.ietf.org/html/rfc6749#section-4.1">
|
||||
* Authorization Code Grant workflow</a>.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ConfRefreshTokenBasedAccessTokenProvider
|
||||
extends AccessTokenProvider {
|
||||
|
||||
public static final String OAUTH_REFRESH_TOKEN_KEY
|
||||
= "dfs.webhdfs.oauth2.refresh.token";
|
||||
public static final String OAUTH_REFRESH_TOKEN_EXPIRES_KEY
|
||||
= "dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch";
|
||||
|
||||
private AccessTokenTimer accessTokenTimer;
|
||||
|
||||
private String accessToken;
|
||||
|
||||
private String refreshToken;
|
||||
|
||||
private String clientId;
|
||||
|
||||
private String refreshURL;
|
||||
|
||||
|
||||
public ConfRefreshTokenBasedAccessTokenProvider() {
|
||||
this.accessTokenTimer = new AccessTokenTimer();
|
||||
}
|
||||
|
||||
public ConfRefreshTokenBasedAccessTokenProvider(Timer timer) {
|
||||
this.accessTokenTimer = new AccessTokenTimer(timer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
refreshToken = notNull(conf, (OAUTH_REFRESH_TOKEN_KEY));
|
||||
|
||||
accessTokenTimer.setExpiresInMSSinceEpoch(
|
||||
notNull(conf, OAUTH_REFRESH_TOKEN_EXPIRES_KEY));
|
||||
|
||||
clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
|
||||
refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getAccessToken() throws IOException {
|
||||
if(accessTokenTimer.shouldRefresh()) {
|
||||
refresh();
|
||||
}
|
||||
|
||||
return accessToken;
|
||||
}
|
||||
|
||||
void refresh() throws IOException {
|
||||
try {
|
||||
OkHttpClient client = new OkHttpClient();
|
||||
client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
String bodyString = Utils.postBody(GRANT_TYPE, REFRESH_TOKEN,
|
||||
REFRESH_TOKEN, refreshToken,
|
||||
CLIENT_ID, clientId);
|
||||
|
||||
RequestBody body = RequestBody.create(URLENCODED, bodyString);
|
||||
|
||||
Request request = new Request.Builder()
|
||||
.url(refreshURL)
|
||||
.post(body)
|
||||
.build();
|
||||
Response responseBody = client.newCall(request).execute();
|
||||
|
||||
if (responseBody.code() != HttpStatus.SC_OK) {
|
||||
throw new IllegalArgumentException("Received invalid http response: "
|
||||
+ responseBody.code() + ", text = " + responseBody.toString());
|
||||
}
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Map<?, ?> response = mapper.reader(Map.class)
|
||||
.readValue(responseBody.body().string());
|
||||
|
||||
|
||||
String newExpiresIn = response.get(EXPIRES_IN).toString();
|
||||
accessTokenTimer.setExpiresIn(newExpiresIn);
|
||||
|
||||
accessToken = response.get(ACCESS_TOKEN).toString();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Exception while refreshing access token", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getRefreshToken() {
|
||||
return refreshToken;
|
||||
}
|
||||
}
|
@ -0,0 +1,135 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import com.squareup.okhttp.OkHttpClient;
|
||||
import com.squareup.okhttp.Request;
|
||||
import com.squareup.okhttp.RequestBody;
|
||||
import com.squareup.okhttp.Response;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
|
||||
|
||||
/**
|
||||
* Obtain an access token via the credential-based OAuth2 workflow. This
|
||||
* abstract class requires only that implementations provide the credential,
|
||||
* which the class then uses to obtain a refresh token.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class CredentialBasedAccessTokenProvider
|
||||
extends AccessTokenProvider {
|
||||
public static final String OAUTH_CREDENTIAL_KEY
|
||||
= "dfs.webhdfs.oauth2.credential";
|
||||
|
||||
private AccessTokenTimer timer;
|
||||
|
||||
private String clientId;
|
||||
|
||||
private String refreshURL;
|
||||
|
||||
private String accessToken;
|
||||
|
||||
private boolean initialCredentialObtained = false;
|
||||
|
||||
CredentialBasedAccessTokenProvider() {
|
||||
this.timer = new AccessTokenTimer();
|
||||
}
|
||||
|
||||
CredentialBasedAccessTokenProvider(Timer timer) {
|
||||
this.timer = new AccessTokenTimer(timer);
|
||||
}
|
||||
|
||||
abstract String getCredential();
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
|
||||
refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getAccessToken() throws IOException {
|
||||
if(timer.shouldRefresh() || !initialCredentialObtained) {
|
||||
refresh();
|
||||
initialCredentialObtained = true;
|
||||
}
|
||||
|
||||
return accessToken;
|
||||
}
|
||||
|
||||
void refresh() throws IOException {
|
||||
try {
|
||||
OkHttpClient client = new OkHttpClient();
|
||||
client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
String bodyString = Utils.postBody(CLIENT_SECRET, getCredential(),
|
||||
GRANT_TYPE, CLIENT_CREDENTIALS,
|
||||
CLIENT_ID, clientId);
|
||||
|
||||
RequestBody body = RequestBody.create(URLENCODED, bodyString);
|
||||
|
||||
Request request = new Request.Builder()
|
||||
.url(refreshURL)
|
||||
.post(body)
|
||||
.build();
|
||||
Response responseBody = client.newCall(request).execute();
|
||||
|
||||
if (responseBody.code() != HttpStatus.SC_OK) {
|
||||
throw new IllegalArgumentException("Received invalid http response: "
|
||||
+ responseBody.code() + ", text = " + responseBody.toString());
|
||||
}
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Map<?, ?> response = mapper.reader(Map.class)
|
||||
.readValue(responseBody.body().string());
|
||||
|
||||
String newExpiresIn = response.get(EXPIRES_IN).toString();
|
||||
timer.setExpiresIn(newExpiresIn);
|
||||
|
||||
accessToken = response.get(ACCESS_TOKEN).toString();
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Unable to obtain access token from credential", e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
|
||||
|
||||
/**
|
||||
* Configure a connection to use OAuth2 authentication.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class OAuth2ConnectionConfigurator implements ConnectionConfigurator {
|
||||
|
||||
public static final String HEADER = "Bearer ";
|
||||
|
||||
private final AccessTokenProvider accessTokenProvider;
|
||||
|
||||
private ConnectionConfigurator sslConfigurator = null;
|
||||
|
||||
public OAuth2ConnectionConfigurator(Configuration conf) {
|
||||
this(conf, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public OAuth2ConnectionConfigurator(Configuration conf,
|
||||
ConnectionConfigurator sslConfigurator) {
|
||||
this.sslConfigurator = sslConfigurator;
|
||||
|
||||
notNull(conf, ACCESS_TOKEN_PROVIDER_KEY);
|
||||
|
||||
Class accessTokenProviderClass = conf.getClass(ACCESS_TOKEN_PROVIDER_KEY,
|
||||
ConfCredentialBasedAccessTokenProvider.class,
|
||||
AccessTokenProvider.class);
|
||||
|
||||
accessTokenProvider = (AccessTokenProvider) ReflectionUtils
|
||||
.newInstance(accessTokenProviderClass, conf);
|
||||
accessTokenProvider.setConf(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
if(sslConfigurator != null) {
|
||||
sslConfigurator.configure(conn);
|
||||
}
|
||||
|
||||
String accessToken = accessTokenProvider.getAccessToken();
|
||||
|
||||
conn.setRequestProperty("AUTHORIZATION", HEADER + accessToken);
|
||||
|
||||
return conn;
|
||||
}
|
||||
}
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import com.squareup.okhttp.MediaType;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Sundry constants relating to OAuth2 within WebHDFS.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public final class OAuth2Constants {
|
||||
private OAuth2Constants() { /** Private constructor. **/ }
|
||||
|
||||
public static final MediaType URLENCODED
|
||||
= MediaType.parse("application/x-www-form-urlencoded; charset=utf-8");
|
||||
|
||||
/* Constants for OAuth protocol */
|
||||
public static final String ACCESS_TOKEN = "access_token";
|
||||
public static final String BEARER = "bearer";
|
||||
public static final String CLIENT_CREDENTIALS = "client_credentials";
|
||||
public static final String CLIENT_ID = "client_id";
|
||||
public static final String CLIENT_SECRET = "client_secret";
|
||||
public static final String EXPIRES_IN = "expires_in";
|
||||
public static final String GRANT_TYPE = "grant_type";
|
||||
public static final String REFRESH_TOKEN = "refresh_token";
|
||||
public static final String TOKEN_TYPE = "token_type";
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
final class Utils {
|
||||
private Utils() { /* Private constructor */ }
|
||||
|
||||
public static String notNull(Configuration conf, String key) {
|
||||
String value = conf.get(key);
|
||||
|
||||
if(value == null) {
|
||||
throw new IllegalArgumentException("No value for " + key +
|
||||
" found in conf file.");
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
public static String postBody(String ... kv)
|
||||
throws UnsupportedEncodingException {
|
||||
if(kv.length % 2 != 0) {
|
||||
throw new IllegalArgumentException("Arguments must be key value pairs");
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int i = 0;
|
||||
|
||||
while(i < kv.length) {
|
||||
if(i > 0) {
|
||||
sb.append("&");
|
||||
}
|
||||
sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
|
||||
sb.append("=");
|
||||
sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* OAuth2-based WebHDFS authentication.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
@ -357,6 +357,8 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8131. Implement a space balanced block placement policy (Liu Shaohui
|
||||
via kihwal)
|
||||
|
||||
HDFS-8155. Support OAuth2 in WebHDFS. (jghoman)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-2390. dfsadmin -setBalancerBandwidth does not validate -ve value
|
||||
|
@ -213,6 +213,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<artifactId>leveldbjni-all</artifactId>
|
||||
<version>1.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mock-server</groupId>
|
||||
<artifactId>mockserver-netty</artifactId>
|
||||
<version>3.9.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
|
@ -221,6 +221,31 @@ Below are examples using the `curl` command tool.
|
||||
|
||||
See also: [Authentication for Hadoop HTTP web-consoles](../hadoop-common/HttpAuthentication.html)
|
||||
|
||||
Additionally, WebHDFS supports OAuth2 on the client side. The Namenode and Datanodes do not currently support clients using OAuth2 but other backends that implement the WebHDFS REST interface may.
|
||||
|
||||
WebHDFS supports two type of OAuth2 code grants (user-provided refresh and access token or user provided credential) by default and provides a pluggable mechanism for implementing other OAuth2 authentications per the [OAuth2 RFC](https://tools.ietf.org/html/rfc6749), or custom authentications. When using either of the provided code grant mechanisms, the WebHDFS client will refresh the access token as necessary.
|
||||
|
||||
OAuth2 should only be enabled for clients not running with Kerberos SPENGO.
|
||||
|
||||
| OAuth2 code grant mechanism | Description | Value of `dfs.webhdfs.oauth2.access.token.provider` that implements code grant |
|
||||
|:---- |:---- |:----|
|
||||
| Authorization Code Grant | The user provides an initial access token and refresh token, which are then used to authenticate WebHDFS requests and obtain replacement access tokens, respectively. | org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider |
|
||||
| Client Credentials Grant | The user provides a credential which is used to obtain access tokens, which are then used to authenticate WebHDFS requests. | org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider |
|
||||
|
||||
|
||||
The following properties control OAuth2 authentication.
|
||||
|
||||
| OAuth2 related property | Description |
|
||||
|:---- |:---- |
|
||||
| `dfs.webhdfs.oauth2.enabled` | Boolean to enable/disable OAuth2 authentication |
|
||||
| `dfs.webhdfs.oauth2.access.token.provider` | Class name of an implementation of `org.apache.hadoop.hdfs.web.oauth.AccessTokenProvider.` Two are provided with the code, as described above, or the user may specify a user-provided implementation. The default value for this configuration key is the `ConfCredentialBasedAccessTokenProvider` implementation. |
|
||||
| `dfs.webhdfs.oauth2.client.id` | Client id used to obtain access token with either credential or refresh token |
|
||||
| `dfs.webhdfs.oauth2.refresh.url` | URL against which to post for obtaining bearer token with either credential or refresh token |
|
||||
| `dfs.webhdfs.oauth2.access.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial access token with which to authenticate |
|
||||
| `dfs.webhdfs.oauth2.refresh.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial refresh token to use to obtain new access tokens |
|
||||
| `dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Access token expiration measured in milliseconds since Jan 1, 1970. *Note this is a different value than provided by OAuth providers and has been munged as described in interface to be suitable for a client application* |
|
||||
| `dfs.webhdfs.oauth2.credential` | (required if using ConfCredentialBasedAccessTokenProvider). Credential used to obtain initial and subsequent access tokens. |
|
||||
|
||||
Proxy Users
|
||||
-----------
|
||||
|
||||
|
@ -0,0 +1,216 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider;
|
||||
import org.apache.hadoop.hdfs.web.oauth2.CredentialBasedAccessTokenProvider;
|
||||
import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockserver.client.server.MockServerClient;
|
||||
import org.mockserver.integration.ClientAndServer;
|
||||
import org.mockserver.model.Header;
|
||||
import org.mockserver.model.HttpRequest;
|
||||
import org.mockserver.model.HttpResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
|
||||
import static org.mockserver.matchers.Times.exactly;
|
||||
import static org.mockserver.model.HttpRequest.request;
|
||||
import static org.mockserver.model.HttpResponse.response;
|
||||
|
||||
public class TestWebHDFSOAuth2 {
|
||||
public static final Log LOG = LogFactory.getLog(TestWebHDFSOAuth2.class);
|
||||
|
||||
private ClientAndServer mockWebHDFS;
|
||||
private ClientAndServer mockOAuthServer;
|
||||
|
||||
public final static int WEBHDFS_PORT = 7552;
|
||||
public final static int OAUTH_PORT = 7553;
|
||||
|
||||
public final static Header CONTENT_TYPE_APPLICATION_JSON = new Header("Content-Type", "application/json");
|
||||
|
||||
public final static String AUTH_TOKEN = "0123456789abcdef";
|
||||
public final static Header AUTH_TOKEN_HEADER = new Header("AUTHORIZATION", OAuth2ConnectionConfigurator.HEADER + AUTH_TOKEN);
|
||||
|
||||
@Before
|
||||
public void startMockOAuthServer() {
|
||||
mockOAuthServer = startClientAndServer(OAUTH_PORT);
|
||||
}
|
||||
@Before
|
||||
public void startMockWebHDFSServer() {
|
||||
System.setProperty("hadoop.home.dir", System.getProperty("user.dir"));
|
||||
|
||||
mockWebHDFS = startClientAndServer(WEBHDFS_PORT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void listStatusReturnsAsExpected() throws URISyntaxException, IOException {
|
||||
MockServerClient mockWebHDFSServerClient = new MockServerClient("localhost", WEBHDFS_PORT);
|
||||
MockServerClient mockOAuthServerClient = new MockServerClient("localhost", OAUTH_PORT);
|
||||
|
||||
HttpRequest oauthServerRequest = getOAuthServerMockRequest(mockOAuthServerClient);
|
||||
|
||||
HttpRequest fileSystemRequest = request()
|
||||
.withMethod("GET")
|
||||
.withPath(WebHdfsFileSystem.PATH_PREFIX + "/test1/test2")
|
||||
.withHeader(AUTH_TOKEN_HEADER);
|
||||
|
||||
try {
|
||||
mockWebHDFSServerClient.when(fileSystemRequest,
|
||||
exactly(1)
|
||||
)
|
||||
.respond(
|
||||
response()
|
||||
.withStatusCode(HttpStatus.SC_OK)
|
||||
.withHeaders(
|
||||
CONTENT_TYPE_APPLICATION_JSON
|
||||
)
|
||||
.withBody("{\n" +
|
||||
" \"FileStatuses\":\n" +
|
||||
" {\n" +
|
||||
" \"FileStatus\":\n" +
|
||||
" [\n" +
|
||||
" {\n" +
|
||||
" \"accessTime\" : 1320171722771,\n" +
|
||||
" \"blockSize\" : 33554432,\n" +
|
||||
" \"group\" : \"supergroup\",\n" +
|
||||
" \"length\" : 24930,\n" +
|
||||
" \"modificationTime\": 1320171722771,\n" +
|
||||
" \"owner\" : \"webuser\",\n" +
|
||||
" \"pathSuffix\" : \"a.patch\",\n" +
|
||||
" \"permission\" : \"644\",\n" +
|
||||
" \"replication\" : 1,\n" +
|
||||
" \"type\" : \"FILE\"\n" +
|
||||
" },\n" +
|
||||
" {\n" +
|
||||
" \"accessTime\" : 0,\n" +
|
||||
" \"blockSize\" : 0,\n" +
|
||||
" \"group\" : \"supergroup\",\n" +
|
||||
" \"length\" : 0,\n" +
|
||||
" \"modificationTime\": 1320895981256,\n" +
|
||||
" \"owner\" : \"szetszwo\",\n" +
|
||||
" \"pathSuffix\" : \"bar\",\n" +
|
||||
" \"permission\" : \"711\",\n" +
|
||||
" \"replication\" : 0,\n" +
|
||||
" \"type\" : \"DIRECTORY\"\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
" }\n" +
|
||||
"}\n")
|
||||
);
|
||||
|
||||
FileSystem fs = new WebHdfsFileSystem();
|
||||
Configuration conf = getConfiguration();
|
||||
conf.set(OAUTH_REFRESH_URL_KEY, "http://localhost:" + OAUTH_PORT + "/refresh");
|
||||
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY, "credential");
|
||||
|
||||
URI uri = new URI("webhdfs://localhost:" + WEBHDFS_PORT);
|
||||
fs.initialize(uri, conf);
|
||||
|
||||
FileStatus[] ls = fs.listStatus(new Path("/test1/test2"));
|
||||
|
||||
mockOAuthServer.verify(oauthServerRequest);
|
||||
mockWebHDFSServerClient.verify(fileSystemRequest);
|
||||
|
||||
assertEquals(2, ls.length);
|
||||
assertEquals("a.patch", ls[0].getPath().getName());
|
||||
assertEquals("bar", ls[1].getPath().getName());
|
||||
|
||||
fs.close();
|
||||
} finally {
|
||||
mockWebHDFSServerClient.clear(fileSystemRequest);
|
||||
mockOAuthServerClient.clear(oauthServerRequest);
|
||||
}
|
||||
}
|
||||
|
||||
private HttpRequest getOAuthServerMockRequest(MockServerClient mockServerClient) throws IOException {
|
||||
HttpRequest expectedRequest = request()
|
||||
.withMethod("POST")
|
||||
.withPath("/refresh")
|
||||
.withBody("client_secret=credential&grant_type=client_credentials&client_id=MY_CLIENTID");
|
||||
|
||||
Map<String, Object> map = new TreeMap<>();
|
||||
|
||||
map.put(EXPIRES_IN, "0987654321");
|
||||
map.put(TOKEN_TYPE, "bearer");
|
||||
map.put(ACCESS_TOKEN, AUTH_TOKEN);
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
HttpResponse resp = response()
|
||||
.withStatusCode(HttpStatus.SC_OK)
|
||||
.withHeaders(
|
||||
CONTENT_TYPE_APPLICATION_JSON
|
||||
)
|
||||
.withBody(mapper.writeValueAsString(map));
|
||||
|
||||
mockServerClient
|
||||
.when(expectedRequest, exactly(1))
|
||||
.respond(resp);
|
||||
|
||||
return expectedRequest;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
// Configs for OAuth2
|
||||
conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, true);
|
||||
conf.set(OAUTH_CLIENT_ID_KEY, "MY_CLIENTID");
|
||||
|
||||
conf.set(ACCESS_TOKEN_PROVIDER_KEY,
|
||||
ConfCredentialBasedAccessTokenProvider.class.getName());
|
||||
|
||||
return conf;
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopMockWebHDFSServer() {
|
||||
mockWebHDFS.stop();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopMockOAuthServer() {
|
||||
mockOAuthServer.stop();
|
||||
}
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestAccessTokenTimer {
|
||||
@Test
|
||||
public void expireConversionWorks() {
|
||||
Timer mockTimer = mock(Timer.class);
|
||||
when(mockTimer.now())
|
||||
.thenReturn(5l);
|
||||
|
||||
AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
|
||||
|
||||
timer.setExpiresIn("3");
|
||||
assertEquals(3005, timer.getNextRefreshMSSinceEpoch());
|
||||
|
||||
assertTrue(timer.shouldRefresh());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldRefreshIsCorrect() {
|
||||
Timer mockTimer = mock(Timer.class);
|
||||
when(mockTimer.now())
|
||||
.thenReturn(500l)
|
||||
.thenReturn(1000000l + 500l);
|
||||
|
||||
AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
|
||||
|
||||
timer.setExpiresInMSSinceEpoch("1000000");
|
||||
|
||||
assertFalse(timer.shouldRefresh());
|
||||
assertTrue(timer.shouldRefresh());
|
||||
|
||||
verify(mockTimer, times(2)).now();
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.Test;
|
||||
import org.mockserver.client.server.MockServerClient;
|
||||
import org.mockserver.integration.ClientAndServer;
|
||||
import org.mockserver.model.Header;
|
||||
import org.mockserver.model.HttpRequest;
|
||||
import org.mockserver.model.HttpResponse;
|
||||
import org.mockserver.model.Parameter;
|
||||
import org.mockserver.model.ParameterBody;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
|
||||
import static org.mockserver.matchers.Times.exactly;
|
||||
import static org.mockserver.model.HttpRequest.request;
|
||||
import static org.mockserver.model.HttpResponse.response;
|
||||
|
||||
public class TestClientCredentialTimeBasedTokenRefresher {
|
||||
public final static Header CONTENT_TYPE_APPLICATION_JSON
|
||||
= new Header("Content-Type", "application/json");
|
||||
|
||||
public final static String CLIENT_ID_FOR_TESTING = "joebob";
|
||||
|
||||
public Configuration buildConf(String credential, String tokenExpires,
|
||||
String clientId, String refreshURL) {
|
||||
// Configurations are simple enough that it's not worth mocking them out.
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY,
|
||||
credential);
|
||||
conf.set(ACCESS_TOKEN_PROVIDER_KEY,
|
||||
ConfCredentialBasedAccessTokenProvider.class.getName());
|
||||
conf.set(OAUTH_CLIENT_ID_KEY, clientId);
|
||||
conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshUrlIsCorrect() throws IOException {
|
||||
final int PORT = 7552;
|
||||
final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
|
||||
|
||||
long tokenExpires = 0;
|
||||
|
||||
Configuration conf = buildConf("myreallycoolcredential",
|
||||
Long.toString(tokenExpires),
|
||||
CLIENT_ID_FOR_TESTING,
|
||||
REFRESH_ADDRESS);
|
||||
|
||||
Timer mockTimer = mock(Timer.class);
|
||||
when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
|
||||
|
||||
AccessTokenProvider credProvider =
|
||||
new ConfCredentialBasedAccessTokenProvider(mockTimer);
|
||||
credProvider.setConf(conf);
|
||||
|
||||
// Build mock server to receive refresh request
|
||||
ClientAndServer mockServer = startClientAndServer(PORT);
|
||||
|
||||
HttpRequest expectedRequest = request()
|
||||
.withMethod("POST")
|
||||
.withPath("/refresh")
|
||||
.withBody(
|
||||
// Note, OkHttp does not sort the param values, so we need to do
|
||||
// it ourselves via the ordering provided to ParameterBody...
|
||||
ParameterBody.params(
|
||||
Parameter.param(CLIENT_SECRET, "myreallycoolcredential"),
|
||||
Parameter.param(GRANT_TYPE, CLIENT_CREDENTIALS),
|
||||
Parameter.param(CLIENT_ID, CLIENT_ID_FOR_TESTING)
|
||||
));
|
||||
|
||||
MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
|
||||
|
||||
// https://tools.ietf.org/html/rfc6749#section-5.1
|
||||
Map<String, Object> map = new TreeMap<>();
|
||||
|
||||
map.put(EXPIRES_IN, "0987654321");
|
||||
map.put(TOKEN_TYPE, "bearer");
|
||||
map.put(ACCESS_TOKEN, "new access token");
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
HttpResponse resp = response()
|
||||
.withStatusCode(HttpStatus.SC_OK)
|
||||
.withHeaders(
|
||||
CONTENT_TYPE_APPLICATION_JSON
|
||||
)
|
||||
.withBody(mapper.writeValueAsString(map));
|
||||
|
||||
mockServerClient
|
||||
.when(expectedRequest, exactly(1))
|
||||
.respond(resp);
|
||||
|
||||
assertEquals("new access token", credProvider.getAccessToken());
|
||||
|
||||
mockServerClient.verify(expectedRequest);
|
||||
|
||||
mockServerClient.clear(expectedRequest);
|
||||
mockServer.stop();
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web.oauth2;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.Test;
|
||||
import org.mockserver.client.server.MockServerClient;
|
||||
import org.mockserver.integration.ClientAndServer;
|
||||
import org.mockserver.model.Header;
|
||||
import org.mockserver.model.HttpRequest;
|
||||
import org.mockserver.model.HttpResponse;
|
||||
import org.mockserver.model.Parameter;
|
||||
import org.mockserver.model.ParameterBody;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_EXPIRES_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.BEARER;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
|
||||
import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockserver.integration.ClientAndServer.startClientAndServer;
|
||||
import static org.mockserver.matchers.Times.exactly;
|
||||
import static org.mockserver.model.HttpRequest.request;
|
||||
import static org.mockserver.model.HttpResponse.response;
|
||||
|
||||
public class TestRefreshTokenTimeBasedTokenRefresher {
|
||||
|
||||
public final static Header CONTENT_TYPE_APPLICATION_JSON
|
||||
= new Header("Content-Type", "application/json");
|
||||
|
||||
public Configuration buildConf(String refreshToken, String tokenExpires,
|
||||
String clientId, String refreshURL) {
|
||||
// Configurations are simple enough that it's not worth mocking them out.
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(OAUTH_REFRESH_TOKEN_KEY, refreshToken);
|
||||
conf.set(OAUTH_REFRESH_TOKEN_EXPIRES_KEY, tokenExpires);
|
||||
conf.set(OAUTH_CLIENT_ID_KEY, clientId);
|
||||
conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refreshUrlIsCorrect() throws IOException {
|
||||
final int PORT = 7552;
|
||||
final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
|
||||
|
||||
long tokenExpires = 0;
|
||||
|
||||
Configuration conf = buildConf("refresh token key",
|
||||
Long.toString(tokenExpires),
|
||||
"joebob",
|
||||
REFRESH_ADDRESS);
|
||||
|
||||
Timer mockTimer = mock(Timer.class);
|
||||
when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
|
||||
|
||||
AccessTokenProvider tokenProvider =
|
||||
new ConfRefreshTokenBasedAccessTokenProvider(mockTimer);
|
||||
tokenProvider.setConf(conf);
|
||||
|
||||
// Build mock server to receive refresh request
|
||||
|
||||
ClientAndServer mockServer = startClientAndServer(PORT);
|
||||
|
||||
HttpRequest expectedRequest = request()
|
||||
.withMethod("POST")
|
||||
.withPath("/refresh")
|
||||
// Note, OkHttp does not sort the param values, so we need to
|
||||
// do it ourselves via the ordering provided to ParameterBody...
|
||||
.withBody(
|
||||
ParameterBody.params(
|
||||
Parameter.param(CLIENT_ID, "joebob"),
|
||||
Parameter.param(GRANT_TYPE, REFRESH_TOKEN),
|
||||
Parameter.param(REFRESH_TOKEN, "refresh token key")));
|
||||
|
||||
MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
|
||||
|
||||
// https://tools.ietf.org/html/rfc6749#section-5.1
|
||||
Map<String, Object> map = new TreeMap<>();
|
||||
|
||||
map.put(EXPIRES_IN, "0987654321");
|
||||
map.put(TOKEN_TYPE, BEARER);
|
||||
map.put(ACCESS_TOKEN, "new access token");
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
HttpResponse resp = response()
|
||||
.withStatusCode(HttpStatus.SC_OK)
|
||||
.withHeaders(
|
||||
CONTENT_TYPE_APPLICATION_JSON
|
||||
)
|
||||
.withBody(mapper.writeValueAsString(map));
|
||||
|
||||
mockServerClient
|
||||
.when(expectedRequest, exactly(1))
|
||||
.respond(resp);
|
||||
|
||||
assertEquals("new access token", tokenProvider.getAccessToken());
|
||||
|
||||
mockServerClient.verify(expectedRequest);
|
||||
|
||||
mockServerClient.clear(expectedRequest);
|
||||
mockServer.stop();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user