diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index aeaa98058c..68bd289e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -31,6 +31,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
jar
+
+ com.squareup.okhttp
+ okhttp
+ 2.4.0
+
org.apache.hadoop
hadoop-common
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 7b1e4384b5..96bc8d3064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -36,6 +36,14 @@ public interface HdfsClientConfigKeys {
String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
+ String DFS_WEBHDFS_OAUTH_ENABLED_KEY = "dfs.webhdfs.oauth2.enabled";
+ boolean DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT = false;
+
+ String OAUTH_CLIENT_ID_KEY = "dfs.webhdfs.oauth2.client.id";
+ String OAUTH_REFRESH_URL_KEY = "dfs.webhdfs.oauth2.refresh.url";
+
+ String ACCESS_TOKEN_PROVIDER_KEY = "dfs.webhdfs.oauth2.access.token.provider";
+
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
index a5e02f234d..4c2324152a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
@@ -77,15 +78,42 @@ public HttpURLConnection configure(HttpURLConnection conn)
* try to load SSL certificates when it is specified.
*/
public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
+ ConnectionConfigurator conn = getSSLConnectionConfiguration(conf);
+
+ return new URLConnectionFactory(conn);
+ }
+
+ private static ConnectionConfigurator
+ getSSLConnectionConfiguration(Configuration conf) {
ConnectionConfigurator conn = null;
try {
conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
} catch (Exception e) {
LOG.debug(
- "Cannot load customized ssl related configuration. Fallback to system-generic settings.",
+ "Cannot load customized ssl related configuration. Fallback to" +
+ " system-generic settings.",
e);
conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
}
+
+ return conn;
+ }
+
+ /**
+ * Construct a new URLConnectionFactory that supports OAut-based connections.
+ * It will also try to load the SSL configuration when they are specified.
+ */
+ public static URLConnectionFactory
+ newOAuth2URLConnectionFactory(Configuration conf) throws IOException {
+ ConnectionConfigurator conn = null;
+ try {
+ ConnectionConfigurator sslConnConfigurator
+ = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+
+ conn = new OAuth2ConnectionConfigurator(conf, sslConnConfigurator);
+ } catch (Exception e) {
+ throw new IOException("Unable to load OAuth2 connection factory.", e);
+ }
return new URLConnectionFactory(conn);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index ee5238acf9..a75e78f98e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -149,8 +149,19 @@ public synchronized void initialize(URI uri, Configuration conf
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
- connectionFactory = URLConnectionFactory
- .newDefaultURLConnectionFactory(conf);
+ boolean isOAuth = conf.getBoolean(
+ HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY,
+ HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_DEFAULT);
+
+ if(isOAuth) {
+ LOG.info("Enabling OAuth2 in WebHDFS");
+ connectionFactory = URLConnectionFactory
+ .newOAuth2URLConnectionFactory(conf);
+ } else {
+ LOG.info("Not enabling OAuth2 in WebHDFS");
+ connectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(conf);
+ }
ugi = UserGroupInformation.getCurrentUser();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenProvider.java
new file mode 100644
index 0000000000..99e153d7b3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Provide an OAuth2 access token to be used to authenticate http calls in
+ * WebHDFS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AccessTokenProvider implements Configurable {
+ private Configuration conf;
+
+ /**
+ * Obtain the access token that should be added to http connection's header.
+ * Will be called for each connection, so implementations should be
+ * performant. Implementations are responsible for any refreshing of
+ * the token.
+ *
+ * @return Access token to be added to connection header.
+ */
+ abstract String getAccessToken() throws IOException;
+
+ /**
+ * Return the conf.
+ *
+ * @return the conf.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Set the conf.
+ *
+ * @param configuration New configuration.
+ */
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenTimer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenTimer.java
new file mode 100644
index 0000000000..aa05dd6db3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/AccessTokenTimer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Timer;
+
+/**
+ * Access tokens generally expire. This timer helps keep track of that.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AccessTokenTimer {
+ public static final long EXPIRE_BUFFER_MS = 30 * 1000L;
+
+ private final Timer timer;
+
+ /**
+ * When the current access token will expire in milliseconds since
+ * epoch.
+ */
+ private long nextRefreshMSSinceEpoch;
+
+ public AccessTokenTimer() {
+ this(new Timer());
+ }
+
+ /**
+ *
+ * @param timer Timer instance for unit testing
+ */
+ public AccessTokenTimer(Timer timer) {
+ this.timer = timer;
+ this.nextRefreshMSSinceEpoch = 0;
+ }
+
+ /**
+ * Set when the access token will expire as reported by the oauth server,
+ * ie in seconds from now.
+ * @param expiresIn Access time expiration as reported by OAuth server
+ */
+ public void setExpiresIn(String expiresIn) {
+ this.nextRefreshMSSinceEpoch = convertExpiresIn(timer, expiresIn);
+ }
+
+ /**
+ * Set when the access token will expire in milliseconds from epoch,
+ * as required by the WebHDFS configuration. This is a bit hacky and lame.
+ *
+ * @param expiresInMSSinceEpoch Access time expiration in ms since epoch.
+ */
+ public void setExpiresInMSSinceEpoch(String expiresInMSSinceEpoch){
+ this.nextRefreshMSSinceEpoch = Long.parseLong(expiresInMSSinceEpoch);
+ }
+
+ /**
+ * Get next time we should refresh the token.
+ *
+ * @return Next time since epoch we'll need to refresh the token.
+ */
+ public long getNextRefreshMSSinceEpoch() {
+ return nextRefreshMSSinceEpoch;
+ }
+
+ /**
+ * Return true if the current token has expired or will expire within the
+ * EXPIRE_BUFFER_MS (to give ample wiggle room for the call to be made to
+ * the server).
+ */
+ public boolean shouldRefresh() {
+ long lowerLimit = nextRefreshMSSinceEpoch - EXPIRE_BUFFER_MS;
+ long currTime = timer.now();
+ return currTime > lowerLimit;
+ }
+
+ /**
+ * The expires_in param from OAuth is in seconds-from-now. Convert to
+ * milliseconds-from-epoch
+ */
+ static Long convertExpiresIn(Timer timer, String expiresInSecs) {
+ long expiresSecs = Long.parseLong(expiresInSecs);
+ long expiresMs = expiresSecs * 1000;
+ return timer.now() + expiresMs;
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfCredentialBasedAccessTokenProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfCredentialBasedAccessTokenProvider.java
new file mode 100644
index 0000000000..b56dbde1bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfCredentialBasedAccessTokenProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Timer;
+
+import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
+
+/**
+ * Obtain an access token via a a credential (provided through the
+ * Configuration) using the
+ *
+ * Client Credentials Grant workflow.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ConfCredentialBasedAccessTokenProvider
+ extends CredentialBasedAccessTokenProvider {
+ private String credential;
+
+ public ConfCredentialBasedAccessTokenProvider() {
+ }
+
+ public ConfCredentialBasedAccessTokenProvider(Timer timer) {
+ super(timer);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ credential = notNull(conf, OAUTH_CREDENTIAL_KEY);
+ }
+
+ @Override
+ public String getCredential() {
+ if(credential == null) {
+ throw new IllegalArgumentException("Credential has not been " +
+ "provided in configuration");
+ }
+
+ return credential;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfRefreshTokenBasedAccessTokenProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000000..1e80451fb9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/ConfRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.util.Timer;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
+import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
+
+/**
+ * Supply a access token obtained via a refresh token (provided through the
+ * Configuration using the second half of the
+ *
+ * Authorization Code Grant workflow.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ConfRefreshTokenBasedAccessTokenProvider
+ extends AccessTokenProvider {
+
+ public static final String OAUTH_REFRESH_TOKEN_KEY
+ = "dfs.webhdfs.oauth2.refresh.token";
+ public static final String OAUTH_REFRESH_TOKEN_EXPIRES_KEY
+ = "dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch";
+
+ private AccessTokenTimer accessTokenTimer;
+
+ private String accessToken;
+
+ private String refreshToken;
+
+ private String clientId;
+
+ private String refreshURL;
+
+
+ public ConfRefreshTokenBasedAccessTokenProvider() {
+ this.accessTokenTimer = new AccessTokenTimer();
+ }
+
+ public ConfRefreshTokenBasedAccessTokenProvider(Timer timer) {
+ this.accessTokenTimer = new AccessTokenTimer(timer);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ refreshToken = notNull(conf, (OAUTH_REFRESH_TOKEN_KEY));
+
+ accessTokenTimer.setExpiresInMSSinceEpoch(
+ notNull(conf, OAUTH_REFRESH_TOKEN_EXPIRES_KEY));
+
+ clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
+ refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
+
+ }
+
+ @Override
+ public synchronized String getAccessToken() throws IOException {
+ if(accessTokenTimer.shouldRefresh()) {
+ refresh();
+ }
+
+ return accessToken;
+ }
+
+ void refresh() throws IOException {
+ try {
+ OkHttpClient client = new OkHttpClient();
+ client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+
+ String bodyString = Utils.postBody(GRANT_TYPE, REFRESH_TOKEN,
+ REFRESH_TOKEN, refreshToken,
+ CLIENT_ID, clientId);
+
+ RequestBody body = RequestBody.create(URLENCODED, bodyString);
+
+ Request request = new Request.Builder()
+ .url(refreshURL)
+ .post(body)
+ .build();
+ Response responseBody = client.newCall(request).execute();
+
+ if (responseBody.code() != HttpStatus.SC_OK) {
+ throw new IllegalArgumentException("Received invalid http response: "
+ + responseBody.code() + ", text = " + responseBody.toString());
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map, ?> response = mapper.reader(Map.class)
+ .readValue(responseBody.body().string());
+
+
+ String newExpiresIn = response.get(EXPIRES_IN).toString();
+ accessTokenTimer.setExpiresIn(newExpiresIn);
+
+ accessToken = response.get(ACCESS_TOKEN).toString();
+ } catch (Exception e) {
+ throw new IOException("Exception while refreshing access token", e);
+ }
+ }
+
+ public String getRefreshToken() {
+ return refreshToken;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/CredentialBasedAccessTokenProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/CredentialBasedAccessTokenProvider.java
new file mode 100644
index 0000000000..c058e05b93
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/CredentialBasedAccessTokenProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.Response;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
+import org.apache.hadoop.util.Timer;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.URLENCODED;
+import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
+
+/**
+ * Obtain an access token via the credential-based OAuth2 workflow. This
+ * abstract class requires only that implementations provide the credential,
+ * which the class then uses to obtain a refresh token.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class CredentialBasedAccessTokenProvider
+ extends AccessTokenProvider {
+ public static final String OAUTH_CREDENTIAL_KEY
+ = "dfs.webhdfs.oauth2.credential";
+
+ private AccessTokenTimer timer;
+
+ private String clientId;
+
+ private String refreshURL;
+
+ private String accessToken;
+
+ private boolean initialCredentialObtained = false;
+
+ CredentialBasedAccessTokenProvider() {
+ this.timer = new AccessTokenTimer();
+ }
+
+ CredentialBasedAccessTokenProvider(Timer timer) {
+ this.timer = new AccessTokenTimer(timer);
+ }
+
+ abstract String getCredential();
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ clientId = notNull(conf, OAUTH_CLIENT_ID_KEY);
+ refreshURL = notNull(conf, OAUTH_REFRESH_URL_KEY);
+ }
+
+ @Override
+ public synchronized String getAccessToken() throws IOException {
+ if(timer.shouldRefresh() || !initialCredentialObtained) {
+ refresh();
+ initialCredentialObtained = true;
+ }
+
+ return accessToken;
+ }
+
+ void refresh() throws IOException {
+ try {
+ OkHttpClient client = new OkHttpClient();
+ client.setConnectTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ client.setReadTimeout(URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+
+ String bodyString = Utils.postBody(CLIENT_SECRET, getCredential(),
+ GRANT_TYPE, CLIENT_CREDENTIALS,
+ CLIENT_ID, clientId);
+
+ RequestBody body = RequestBody.create(URLENCODED, bodyString);
+
+ Request request = new Request.Builder()
+ .url(refreshURL)
+ .post(body)
+ .build();
+ Response responseBody = client.newCall(request).execute();
+
+ if (responseBody.code() != HttpStatus.SC_OK) {
+ throw new IllegalArgumentException("Received invalid http response: "
+ + responseBody.code() + ", text = " + responseBody.toString());
+ }
+
+ ObjectMapper mapper = new ObjectMapper();
+ Map, ?> response = mapper.reader(Map.class)
+ .readValue(responseBody.body().string());
+
+ String newExpiresIn = response.get(EXPIRES_IN).toString();
+ timer.setExpiresIn(newExpiresIn);
+
+ accessToken = response.get(ACCESS_TOKEN).toString();
+
+ } catch (Exception e) {
+ throw new IOException("Unable to obtain access token from credential", e);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2ConnectionConfigurator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2ConnectionConfigurator.java
new file mode 100644
index 0000000000..f334b2431f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2ConnectionConfigurator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.Utils.notNull;
+
+/**
+ * Configure a connection to use OAuth2 authentication.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class OAuth2ConnectionConfigurator implements ConnectionConfigurator {
+
+ public static final String HEADER = "Bearer ";
+
+ private final AccessTokenProvider accessTokenProvider;
+
+ private ConnectionConfigurator sslConfigurator = null;
+
+ public OAuth2ConnectionConfigurator(Configuration conf) {
+ this(conf, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public OAuth2ConnectionConfigurator(Configuration conf,
+ ConnectionConfigurator sslConfigurator) {
+ this.sslConfigurator = sslConfigurator;
+
+ notNull(conf, ACCESS_TOKEN_PROVIDER_KEY);
+
+ Class accessTokenProviderClass = conf.getClass(ACCESS_TOKEN_PROVIDER_KEY,
+ ConfCredentialBasedAccessTokenProvider.class,
+ AccessTokenProvider.class);
+
+ accessTokenProvider = (AccessTokenProvider) ReflectionUtils
+ .newInstance(accessTokenProviderClass, conf);
+ accessTokenProvider.setConf(conf);
+ }
+
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ if(sslConfigurator != null) {
+ sslConfigurator.configure(conn);
+ }
+
+ String accessToken = accessTokenProvider.getAccessToken();
+
+ conn.setRequestProperty("AUTHORIZATION", HEADER + accessToken);
+
+ return conn;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2Constants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2Constants.java
new file mode 100644
index 0000000000..190a1f5b6a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/OAuth2Constants.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import com.squareup.okhttp.MediaType;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Sundry constants relating to OAuth2 within WebHDFS.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class OAuth2Constants {
+ private OAuth2Constants() { /** Private constructor. **/ }
+
+ public static final MediaType URLENCODED
+ = MediaType.parse("application/x-www-form-urlencoded; charset=utf-8");
+
+ /* Constants for OAuth protocol */
+ public static final String ACCESS_TOKEN = "access_token";
+ public static final String BEARER = "bearer";
+ public static final String CLIENT_CREDENTIALS = "client_credentials";
+ public static final String CLIENT_ID = "client_id";
+ public static final String CLIENT_SECRET = "client_secret";
+ public static final String EXPIRES_IN = "expires_in";
+ public static final String GRANT_TYPE = "grant_type";
+ public static final String REFRESH_TOKEN = "refresh_token";
+ public static final String TOKEN_TYPE = "token_type";
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/Utils.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/Utils.java
new file mode 100644
index 0000000000..939798db16
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/Utils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class Utils {
+ private Utils() { /* Private constructor */ }
+
+ public static String notNull(Configuration conf, String key) {
+ String value = conf.get(key);
+
+ if(value == null) {
+ throw new IllegalArgumentException("No value for " + key +
+ " found in conf file.");
+ }
+
+ return value;
+ }
+
+ public static String postBody(String ... kv)
+ throws UnsupportedEncodingException {
+ if(kv.length % 2 != 0) {
+ throw new IllegalArgumentException("Arguments must be key value pairs");
+ }
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+
+ while(i < kv.length) {
+ if(i > 0) {
+ sb.append("&");
+ }
+ sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
+ sb.append("=");
+ sb.append(URLEncoder.encode(kv[i++], "UTF-8"));
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
new file mode 100644
index 0000000000..aeb581fd1e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * OAuth2-based WebHDFS authentication.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6f46ea5453..3382f81a20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -357,6 +357,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8131. Implement a space balanced block placement policy (Liu Shaohui
via kihwal)
+ HDFS-8155. Support OAuth2 in WebHDFS. (jghoman)
+
IMPROVEMENTS
HDFS-2390. dfsadmin -setBalancerBandwidth does not validate -ve value
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 1a29ad3d8c..d0c2dc7d59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -213,6 +213,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
leveldbjni-all
1.8
+
+ org.mock-server
+ mockserver-netty
+ 3.9.2
+ test
+
org.bouncycastle
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
index 20b9d731ca..d0a0fe08c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/WebHDFS.md
@@ -221,6 +221,31 @@ Below are examples using the `curl` command tool.
See also: [Authentication for Hadoop HTTP web-consoles](../hadoop-common/HttpAuthentication.html)
+Additionally, WebHDFS supports OAuth2 on the client side. The Namenode and Datanodes do not currently support clients using OAuth2 but other backends that implement the WebHDFS REST interface may.
+
+WebHDFS supports two type of OAuth2 code grants (user-provided refresh and access token or user provided credential) by default and provides a pluggable mechanism for implementing other OAuth2 authentications per the [OAuth2 RFC](https://tools.ietf.org/html/rfc6749), or custom authentications. When using either of the provided code grant mechanisms, the WebHDFS client will refresh the access token as necessary.
+
+OAuth2 should only be enabled for clients not running with Kerberos SPENGO.
+
+| OAuth2 code grant mechanism | Description | Value of `dfs.webhdfs.oauth2.access.token.provider` that implements code grant |
+|:---- |:---- |:----|
+| Authorization Code Grant | The user provides an initial access token and refresh token, which are then used to authenticate WebHDFS requests and obtain replacement access tokens, respectively. | org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider |
+| Client Credentials Grant | The user provides a credential which is used to obtain access tokens, which are then used to authenticate WebHDFS requests. | org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider |
+
+
+The following properties control OAuth2 authentication.
+
+| OAuth2 related property | Description |
+|:---- |:---- |
+| `dfs.webhdfs.oauth2.enabled` | Boolean to enable/disable OAuth2 authentication |
+| `dfs.webhdfs.oauth2.access.token.provider` | Class name of an implementation of `org.apache.hadoop.hdfs.web.oauth.AccessTokenProvider.` Two are provided with the code, as described above, or the user may specify a user-provided implementation. The default value for this configuration key is the `ConfCredentialBasedAccessTokenProvider` implementation. |
+| `dfs.webhdfs.oauth2.client.id` | Client id used to obtain access token with either credential or refresh token |
+| `dfs.webhdfs.oauth2.refresh.url` | URL against which to post for obtaining bearer token with either credential or refresh token |
+| `dfs.webhdfs.oauth2.access.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial access token with which to authenticate |
+| `dfs.webhdfs.oauth2.refresh.token` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Initial refresh token to use to obtain new access tokens |
+| `dfs.webhdfs.oauth2.refresh.token.expires.ms.since.epoch` | (required if using ConfRefreshTokenBasedAccessTokenProvider) Access token expiration measured in milliseconds since Jan 1, 1970. *Note this is a different value than provided by OAuth providers and has been munged as described in interface to be suitable for a client application* |
+| `dfs.webhdfs.oauth2.credential` | (required if using ConfCredentialBasedAccessTokenProvider). Credential used to obtain initial and subsequent access tokens. |
+
Proxy Users
-----------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSOAuth2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSOAuth2.java
new file mode 100644
index 0000000000..e2f6230ef2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFSOAuth2.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.web.oauth2.ConfCredentialBasedAccessTokenProvider;
+import org.apache.hadoop.hdfs.web.oauth2.CredentialBasedAccessTokenProvider;
+import org.apache.hadoop.hdfs.web.oauth2.OAuth2ConnectionConfigurator;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.Header;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.matchers.Times.exactly;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class TestWebHDFSOAuth2 {
+ public static final Log LOG = LogFactory.getLog(TestWebHDFSOAuth2.class);
+
+ private ClientAndServer mockWebHDFS;
+ private ClientAndServer mockOAuthServer;
+
+ public final static int WEBHDFS_PORT = 7552;
+ public final static int OAUTH_PORT = 7553;
+
+ public final static Header CONTENT_TYPE_APPLICATION_JSON = new Header("Content-Type", "application/json");
+
+ public final static String AUTH_TOKEN = "0123456789abcdef";
+ public final static Header AUTH_TOKEN_HEADER = new Header("AUTHORIZATION", OAuth2ConnectionConfigurator.HEADER + AUTH_TOKEN);
+
+ @Before
+ public void startMockOAuthServer() {
+ mockOAuthServer = startClientAndServer(OAUTH_PORT);
+ }
+ @Before
+ public void startMockWebHDFSServer() {
+ System.setProperty("hadoop.home.dir", System.getProperty("user.dir"));
+
+ mockWebHDFS = startClientAndServer(WEBHDFS_PORT);
+ }
+
+ @Test
+ public void listStatusReturnsAsExpected() throws URISyntaxException, IOException {
+ MockServerClient mockWebHDFSServerClient = new MockServerClient("localhost", WEBHDFS_PORT);
+ MockServerClient mockOAuthServerClient = new MockServerClient("localhost", OAUTH_PORT);
+
+ HttpRequest oauthServerRequest = getOAuthServerMockRequest(mockOAuthServerClient);
+
+ HttpRequest fileSystemRequest = request()
+ .withMethod("GET")
+ .withPath(WebHdfsFileSystem.PATH_PREFIX + "/test1/test2")
+ .withHeader(AUTH_TOKEN_HEADER);
+
+ try {
+ mockWebHDFSServerClient.when(fileSystemRequest,
+ exactly(1)
+ )
+ .respond(
+ response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withHeaders(
+ CONTENT_TYPE_APPLICATION_JSON
+ )
+ .withBody("{\n" +
+ " \"FileStatuses\":\n" +
+ " {\n" +
+ " \"FileStatus\":\n" +
+ " [\n" +
+ " {\n" +
+ " \"accessTime\" : 1320171722771,\n" +
+ " \"blockSize\" : 33554432,\n" +
+ " \"group\" : \"supergroup\",\n" +
+ " \"length\" : 24930,\n" +
+ " \"modificationTime\": 1320171722771,\n" +
+ " \"owner\" : \"webuser\",\n" +
+ " \"pathSuffix\" : \"a.patch\",\n" +
+ " \"permission\" : \"644\",\n" +
+ " \"replication\" : 1,\n" +
+ " \"type\" : \"FILE\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"accessTime\" : 0,\n" +
+ " \"blockSize\" : 0,\n" +
+ " \"group\" : \"supergroup\",\n" +
+ " \"length\" : 0,\n" +
+ " \"modificationTime\": 1320895981256,\n" +
+ " \"owner\" : \"szetszwo\",\n" +
+ " \"pathSuffix\" : \"bar\",\n" +
+ " \"permission\" : \"711\",\n" +
+ " \"replication\" : 0,\n" +
+ " \"type\" : \"DIRECTORY\"\n" +
+ " }\n" +
+ " ]\n" +
+ " }\n" +
+ "}\n")
+ );
+
+ FileSystem fs = new WebHdfsFileSystem();
+ Configuration conf = getConfiguration();
+ conf.set(OAUTH_REFRESH_URL_KEY, "http://localhost:" + OAUTH_PORT + "/refresh");
+ conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY, "credential");
+
+ URI uri = new URI("webhdfs://localhost:" + WEBHDFS_PORT);
+ fs.initialize(uri, conf);
+
+ FileStatus[] ls = fs.listStatus(new Path("/test1/test2"));
+
+ mockOAuthServer.verify(oauthServerRequest);
+ mockWebHDFSServerClient.verify(fileSystemRequest);
+
+ assertEquals(2, ls.length);
+ assertEquals("a.patch", ls[0].getPath().getName());
+ assertEquals("bar", ls[1].getPath().getName());
+
+ fs.close();
+ } finally {
+ mockWebHDFSServerClient.clear(fileSystemRequest);
+ mockOAuthServerClient.clear(oauthServerRequest);
+ }
+ }
+
+ private HttpRequest getOAuthServerMockRequest(MockServerClient mockServerClient) throws IOException {
+ HttpRequest expectedRequest = request()
+ .withMethod("POST")
+ .withPath("/refresh")
+ .withBody("client_secret=credential&grant_type=client_credentials&client_id=MY_CLIENTID");
+
+ Map map = new TreeMap<>();
+
+ map.put(EXPIRES_IN, "0987654321");
+ map.put(TOKEN_TYPE, "bearer");
+ map.put(ACCESS_TOKEN, AUTH_TOKEN);
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ HttpResponse resp = response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withHeaders(
+ CONTENT_TYPE_APPLICATION_JSON
+ )
+ .withBody(mapper.writeValueAsString(map));
+
+ mockServerClient
+ .when(expectedRequest, exactly(1))
+ .respond(resp);
+
+ return expectedRequest;
+ }
+
+ public Configuration getConfiguration() {
+ Configuration conf = new Configuration();
+
+ // Configs for OAuth2
+ conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_OAUTH_ENABLED_KEY, true);
+ conf.set(OAUTH_CLIENT_ID_KEY, "MY_CLIENTID");
+
+ conf.set(ACCESS_TOKEN_PROVIDER_KEY,
+ ConfCredentialBasedAccessTokenProvider.class.getName());
+
+ return conf;
+
+ }
+
+ @After
+ public void stopMockWebHDFSServer() {
+ mockWebHDFS.stop();
+ }
+
+ @After
+ public void stopMockOAuthServer() {
+ mockOAuthServer.stop();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestAccessTokenTimer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestAccessTokenTimer.java
new file mode 100644
index 0000000000..c387b1ebce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestAccessTokenTimer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.util.Timer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestAccessTokenTimer {
+ @Test
+ public void expireConversionWorks() {
+ Timer mockTimer = mock(Timer.class);
+ when(mockTimer.now())
+ .thenReturn(5l);
+
+ AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
+
+ timer.setExpiresIn("3");
+ assertEquals(3005, timer.getNextRefreshMSSinceEpoch());
+
+ assertTrue(timer.shouldRefresh());
+ }
+
+ @Test
+ public void shouldRefreshIsCorrect() {
+ Timer mockTimer = mock(Timer.class);
+ when(mockTimer.now())
+ .thenReturn(500l)
+ .thenReturn(1000000l + 500l);
+
+ AccessTokenTimer timer = new AccessTokenTimer(mockTimer);
+
+ timer.setExpiresInMSSinceEpoch("1000000");
+
+ assertFalse(timer.shouldRefresh());
+ assertTrue(timer.shouldRefresh());
+
+ verify(mockTimer, times(2)).now();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestClientCredentialTimeBasedTokenRefresher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestClientCredentialTimeBasedTokenRefresher.java
new file mode 100644
index 0000000000..c259b30aae
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestClientCredentialTimeBasedTokenRefresher.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Timer;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.Header;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.model.Parameter;
+import org.mockserver.model.ParameterBody;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ACCESS_TOKEN_PROVIDER_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_CREDENTIALS;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_SECRET;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.matchers.Times.exactly;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class TestClientCredentialTimeBasedTokenRefresher {
+ public final static Header CONTENT_TYPE_APPLICATION_JSON
+ = new Header("Content-Type", "application/json");
+
+ public final static String CLIENT_ID_FOR_TESTING = "joebob";
+
+ public Configuration buildConf(String credential, String tokenExpires,
+ String clientId, String refreshURL) {
+ // Configurations are simple enough that it's not worth mocking them out.
+ Configuration conf = new Configuration();
+ conf.set(CredentialBasedAccessTokenProvider.OAUTH_CREDENTIAL_KEY,
+ credential);
+ conf.set(ACCESS_TOKEN_PROVIDER_KEY,
+ ConfCredentialBasedAccessTokenProvider.class.getName());
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId);
+ conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
+ return conf;
+ }
+
+ @Test
+ public void refreshUrlIsCorrect() throws IOException {
+ final int PORT = 7552;
+ final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
+
+ long tokenExpires = 0;
+
+ Configuration conf = buildConf("myreallycoolcredential",
+ Long.toString(tokenExpires),
+ CLIENT_ID_FOR_TESTING,
+ REFRESH_ADDRESS);
+
+ Timer mockTimer = mock(Timer.class);
+ when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
+
+ AccessTokenProvider credProvider =
+ new ConfCredentialBasedAccessTokenProvider(mockTimer);
+ credProvider.setConf(conf);
+
+ // Build mock server to receive refresh request
+ ClientAndServer mockServer = startClientAndServer(PORT);
+
+ HttpRequest expectedRequest = request()
+ .withMethod("POST")
+ .withPath("/refresh")
+ .withBody(
+ // Note, OkHttp does not sort the param values, so we need to do
+ // it ourselves via the ordering provided to ParameterBody...
+ ParameterBody.params(
+ Parameter.param(CLIENT_SECRET, "myreallycoolcredential"),
+ Parameter.param(GRANT_TYPE, CLIENT_CREDENTIALS),
+ Parameter.param(CLIENT_ID, CLIENT_ID_FOR_TESTING)
+ ));
+
+ MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
+
+ // https://tools.ietf.org/html/rfc6749#section-5.1
+ Map map = new TreeMap<>();
+
+ map.put(EXPIRES_IN, "0987654321");
+ map.put(TOKEN_TYPE, "bearer");
+ map.put(ACCESS_TOKEN, "new access token");
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ HttpResponse resp = response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withHeaders(
+ CONTENT_TYPE_APPLICATION_JSON
+ )
+ .withBody(mapper.writeValueAsString(map));
+
+ mockServerClient
+ .when(expectedRequest, exactly(1))
+ .respond(resp);
+
+ assertEquals("new access token", credProvider.getAccessToken());
+
+ mockServerClient.verify(expectedRequest);
+
+ mockServerClient.clear(expectedRequest);
+ mockServer.stop();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestRefreshTokenTimeBasedTokenRefresher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestRefreshTokenTimeBasedTokenRefresher.java
new file mode 100644
index 0000000000..889ad0e9f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/oauth2/TestRefreshTokenTimeBasedTokenRefresher.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Timer;
+import org.apache.http.HttpStatus;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+import org.mockserver.client.server.MockServerClient;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.Header;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.model.Parameter;
+import org.mockserver.model.ParameterBody;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_EXPIRES_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.BEARER;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.CLIENT_ID;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.EXPIRES_IN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.GRANT_TYPE;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.REFRESH_TOKEN;
+import static org.apache.hadoop.hdfs.web.oauth2.OAuth2Constants.TOKEN_TYPE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+import static org.mockserver.matchers.Times.exactly;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class TestRefreshTokenTimeBasedTokenRefresher {
+
+ public final static Header CONTENT_TYPE_APPLICATION_JSON
+ = new Header("Content-Type", "application/json");
+
+ public Configuration buildConf(String refreshToken, String tokenExpires,
+ String clientId, String refreshURL) {
+ // Configurations are simple enough that it's not worth mocking them out.
+ Configuration conf = new Configuration();
+ conf.set(OAUTH_REFRESH_TOKEN_KEY, refreshToken);
+ conf.set(OAUTH_REFRESH_TOKEN_EXPIRES_KEY, tokenExpires);
+ conf.set(OAUTH_CLIENT_ID_KEY, clientId);
+ conf.set(OAUTH_REFRESH_URL_KEY, refreshURL);
+
+ return conf;
+ }
+
+ @Test
+ public void refreshUrlIsCorrect() throws IOException {
+ final int PORT = 7552;
+ final String REFRESH_ADDRESS = "http://localhost:" + PORT + "/refresh";
+
+ long tokenExpires = 0;
+
+ Configuration conf = buildConf("refresh token key",
+ Long.toString(tokenExpires),
+ "joebob",
+ REFRESH_ADDRESS);
+
+ Timer mockTimer = mock(Timer.class);
+ when(mockTimer.now()).thenReturn(tokenExpires + 1000l);
+
+ AccessTokenProvider tokenProvider =
+ new ConfRefreshTokenBasedAccessTokenProvider(mockTimer);
+ tokenProvider.setConf(conf);
+
+ // Build mock server to receive refresh request
+
+ ClientAndServer mockServer = startClientAndServer(PORT);
+
+ HttpRequest expectedRequest = request()
+ .withMethod("POST")
+ .withPath("/refresh")
+ // Note, OkHttp does not sort the param values, so we need to
+ // do it ourselves via the ordering provided to ParameterBody...
+ .withBody(
+ ParameterBody.params(
+ Parameter.param(CLIENT_ID, "joebob"),
+ Parameter.param(GRANT_TYPE, REFRESH_TOKEN),
+ Parameter.param(REFRESH_TOKEN, "refresh token key")));
+
+ MockServerClient mockServerClient = new MockServerClient("localhost", PORT);
+
+ // https://tools.ietf.org/html/rfc6749#section-5.1
+ Map map = new TreeMap<>();
+
+ map.put(EXPIRES_IN, "0987654321");
+ map.put(TOKEN_TYPE, BEARER);
+ map.put(ACCESS_TOKEN, "new access token");
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ HttpResponse resp = response()
+ .withStatusCode(HttpStatus.SC_OK)
+ .withHeaders(
+ CONTENT_TYPE_APPLICATION_JSON
+ )
+ .withBody(mapper.writeValueAsString(map));
+
+ mockServerClient
+ .when(expectedRequest, exactly(1))
+ .respond(resp);
+
+ assertEquals("new access token", tokenProvider.getAccessToken());
+
+ mockServerClient.verify(expectedRequest);
+
+ mockServerClient.clear(expectedRequest);
+ mockServer.stop();
+ }
+
+}