HDFS-2784. Update hftp and hdfs for host-based token support. Contributed by Kihwal Lee.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239763 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
191db6a907
commit
9eb8f4d267
@ -110,6 +110,9 @@ Trunk (unreleased changes)
|
|||||||
HDFS-2814 NamenodeMXBean does not account for svn revision in the version
|
HDFS-2814 NamenodeMXBean does not account for svn revision in the version
|
||||||
information. (Hitesh Shah via jitendra)
|
information. (Hitesh Shah via jitendra)
|
||||||
|
|
||||||
|
HDFS-2784. Update hftp and hdfs for host-based token support.
|
||||||
|
(Kihwal Lee via jitendra)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HDFS-2477. Optimize computing the diff between a block report and the
|
HDFS-2477. Optimize computing the diff between a block report and the
|
||||||
namenode state. (Tomasz Nykiel via hairong)
|
namenode state. (Tomasz Nykiel via hairong)
|
||||||
|
@ -631,7 +631,7 @@ public long renew(Token<?> token, Configuration conf) throws IOException {
|
|||||||
DelegationTokenIdentifier.stringifyToken(delToken));
|
DelegationTokenIdentifier.stringifyToken(delToken));
|
||||||
ClientProtocol nn =
|
ClientProtocol nn =
|
||||||
DFSUtil.createNamenode
|
DFSUtil.createNamenode
|
||||||
(NameNode.getAddress(token.getService().toString()),
|
(SecurityUtil.getTokenServiceAddr(delToken),
|
||||||
conf, UserGroupInformation.getCurrentUser());
|
conf, UserGroupInformation.getCurrentUser());
|
||||||
try {
|
try {
|
||||||
return nn.renewDelegationToken(delToken);
|
return nn.renewDelegationToken(delToken);
|
||||||
@ -649,7 +649,7 @@ public void cancel(Token<?> token, Configuration conf) throws IOException {
|
|||||||
LOG.info("Cancelling " +
|
LOG.info("Cancelling " +
|
||||||
DelegationTokenIdentifier.stringifyToken(delToken));
|
DelegationTokenIdentifier.stringifyToken(delToken));
|
||||||
ClientProtocol nn = DFSUtil.createNamenode(
|
ClientProtocol nn = DFSUtil.createNamenode(
|
||||||
NameNode.getAddress(token.getService().toString()), conf,
|
SecurityUtil.getTokenServiceAddr(delToken), conf,
|
||||||
UserGroupInformation.getCurrentUser());
|
UserGroupInformation.getCurrentUser());
|
||||||
try {
|
try {
|
||||||
nn.cancelDelegationToken(delToken);
|
nn.cancelDelegationToken(delToken);
|
||||||
|
@ -108,45 +108,10 @@ public void initialize(URI uri, Configuration conf) throws IOException {
|
|||||||
|
|
||||||
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
|
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
|
||||||
this.dfs = new DFSClient(namenode, conf, statistics);
|
this.dfs = new DFSClient(namenode, conf, statistics);
|
||||||
this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority());
|
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
|
||||||
this.workingDir = getHomeDirectory();
|
this.workingDir = getHomeDirectory();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Permit paths which explicitly specify the default port. */
|
|
||||||
@Override
|
|
||||||
protected void checkPath(Path path) {
|
|
||||||
URI thisUri = this.getUri();
|
|
||||||
URI thatUri = path.toUri();
|
|
||||||
String thatAuthority = thatUri.getAuthority();
|
|
||||||
if (thatUri.getScheme() != null
|
|
||||||
&& thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
|
|
||||||
&& thatUri.getPort() == NameNode.DEFAULT_PORT
|
|
||||||
&& (thisUri.getPort() == -1 ||
|
|
||||||
thisUri.getPort() == NameNode.DEFAULT_PORT)
|
|
||||||
&& thatAuthority.substring(0,thatAuthority.indexOf(":"))
|
|
||||||
.equalsIgnoreCase(thisUri.getAuthority()))
|
|
||||||
return;
|
|
||||||
super.checkPath(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Normalize paths that explicitly specify the default port. */
|
|
||||||
@Override
|
|
||||||
public Path makeQualified(Path path) {
|
|
||||||
URI thisUri = this.getUri();
|
|
||||||
URI thatUri = path.toUri();
|
|
||||||
String thatAuthority = thatUri.getAuthority();
|
|
||||||
if (thatUri.getScheme() != null
|
|
||||||
&& thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
|
|
||||||
&& thatUri.getPort() == NameNode.DEFAULT_PORT
|
|
||||||
&& thisUri.getPort() == -1
|
|
||||||
&& thatAuthority.substring(0,thatAuthority.indexOf(":"))
|
|
||||||
.equalsIgnoreCase(thisUri.getAuthority())) {
|
|
||||||
path = new Path(thisUri.getScheme(), thisUri.getAuthority(),
|
|
||||||
thatUri.getPath());
|
|
||||||
}
|
|
||||||
return super.makeQualified(path);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path getWorkingDirectory() {
|
public Path getWorkingDirectory() {
|
||||||
return workingDir;
|
return workingDir;
|
||||||
|
@ -59,6 +59,7 @@
|
|||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ServletUtil;
|
import org.apache.hadoop.util.ServletUtil;
|
||||||
import org.xml.sax.Attributes;
|
import org.xml.sax.Attributes;
|
||||||
@ -89,16 +90,19 @@ public class HftpFileSystem extends FileSystem
|
|||||||
|
|
||||||
public static final Text TOKEN_KIND = new Text("HFTP delegation");
|
public static final Text TOKEN_KIND = new Text("HFTP delegation");
|
||||||
|
|
||||||
private String nnHttpUrl;
|
|
||||||
private Text hdfsServiceName;
|
|
||||||
private URI hftpURI;
|
|
||||||
protected InetSocketAddress nnAddr;
|
|
||||||
protected UserGroupInformation ugi;
|
protected UserGroupInformation ugi;
|
||||||
|
private URI hftpURI;
|
||||||
|
|
||||||
|
protected InetSocketAddress nnAddr;
|
||||||
|
protected InetSocketAddress nnSecureAddr;
|
||||||
|
|
||||||
public static final String HFTP_TIMEZONE = "UTC";
|
public static final String HFTP_TIMEZONE = "UTC";
|
||||||
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||||
|
|
||||||
private Token<?> delegationToken;
|
private Token<?> delegationToken;
|
||||||
private Token<?> renewToken;
|
private Token<?> renewToken;
|
||||||
|
private static final HftpDelegationTokenSelector hftpTokenSelector =
|
||||||
|
new HftpDelegationTokenSelector();
|
||||||
|
|
||||||
public static final SimpleDateFormat getDateFormat() {
|
public static final SimpleDateFormat getDateFormat() {
|
||||||
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
||||||
@ -115,11 +119,8 @@ protected SimpleDateFormat initialValue() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int getDefaultPort() {
|
protected int getDefaultPort() {
|
||||||
return getDefaultSecurePort();
|
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
||||||
//TODO: un-comment the following once HDFS-7510 is committed.
|
|
||||||
// return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
|
|
||||||
// DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected int getDefaultSecurePort() {
|
protected int getDefaultSecurePort() {
|
||||||
@ -127,112 +128,74 @@ protected int getDefaultSecurePort() {
|
|||||||
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected InetSocketAddress getNamenodeAddr(URI uri) {
|
||||||
|
// use authority so user supplied uri can override port
|
||||||
|
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
|
||||||
|
// must only use the host and the configured https port
|
||||||
|
return NetUtils.createSocketAddrForHost(uri.getHost(), getDefaultSecurePort());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getCanonicalServiceName() {
|
public String getCanonicalServiceName() {
|
||||||
return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
|
// unlike other filesystems, hftp's service is the secure port, not the
|
||||||
|
// actual port in the uri
|
||||||
|
return SecurityUtil.buildTokenService(nnSecureAddr).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildUri(String schema, String host, int port) {
|
|
||||||
StringBuilder sb = new StringBuilder(schema);
|
|
||||||
return sb.append(host).append(":").append(port).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(final URI name, final Configuration conf)
|
public void initialize(final URI name, final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super.initialize(name, conf);
|
super.initialize(name, conf);
|
||||||
setConf(conf);
|
setConf(conf);
|
||||||
this.ugi = UserGroupInformation.getCurrentUser();
|
this.ugi = UserGroupInformation.getCurrentUser();
|
||||||
nnAddr = NetUtils.createSocketAddr(name.toString());
|
this.nnAddr = getNamenodeAddr(name);
|
||||||
|
this.nnSecureAddr = getNamenodeSecureAddr(name);
|
||||||
// in case we open connection to hftp of a different cluster
|
|
||||||
// we need to know this cluster https port
|
|
||||||
// if it is not set we assume it is the same cluster or same port
|
|
||||||
int urlPort = conf.getInt("dfs.hftp.https.port", -1);
|
|
||||||
if(urlPort == -1)
|
|
||||||
urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY,
|
|
||||||
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
|
|
||||||
|
|
||||||
String normalizedNN = NetUtils.normalizeHostName(name.getHost());
|
|
||||||
nnHttpUrl = buildUri("https://", normalizedNN ,urlPort);
|
|
||||||
LOG.debug("using url to get DT:" + nnHttpUrl);
|
|
||||||
try {
|
try {
|
||||||
hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort));
|
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
|
||||||
} catch (URISyntaxException ue) {
|
null, null, null);
|
||||||
throw new IOException("bad uri for hdfs", ue);
|
} catch (URISyntaxException e) {
|
||||||
}
|
throw new IllegalArgumentException(e);
|
||||||
|
|
||||||
// if one uses RPC port different from the Default one,
|
|
||||||
// one should specify what is the setvice name for this delegation token
|
|
||||||
// otherwise it is hostname:RPC_PORT
|
|
||||||
String key = DelegationTokenSelector.SERVICE_NAME_KEY
|
|
||||||
+ SecurityUtil.buildDTServiceName(name,
|
|
||||||
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
|
|
||||||
if(LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Trying to find DT for " + name + " using key=" + key +
|
|
||||||
"; conf=" + conf.get(key, ""));
|
|
||||||
}
|
|
||||||
String nnServiceName = conf.get(key);
|
|
||||||
int nnPort = NameNode.DEFAULT_PORT;
|
|
||||||
if (nnServiceName != null) { // get the real port
|
|
||||||
nnPort = NetUtils.createSocketAddr(nnServiceName,
|
|
||||||
NameNode.DEFAULT_PORT).getPort();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort);
|
|
||||||
hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI,
|
|
||||||
nnPort));
|
|
||||||
} catch (URISyntaxException ue) {
|
|
||||||
throw new IOException("bad uri for hdfs", ue);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
//try finding a token for this namenode (esp applicable for tasks
|
initDelegationToken();
|
||||||
//using hftp). If there exists one, just set the delegationField
|
}
|
||||||
String hftpServiceName = getCanonicalServiceName();
|
}
|
||||||
for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
|
|
||||||
Text kind = t.getKind();
|
|
||||||
if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) {
|
|
||||||
if (t.getService().equals(hdfsServiceName)) {
|
|
||||||
setDelegationToken(t);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (TOKEN_KIND.equals(kind)) {
|
|
||||||
if (hftpServiceName
|
|
||||||
.equals(normalizeService(t.getService().toString()))) {
|
|
||||||
setDelegationToken(t);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//since we don't already have a token, go get one over https
|
protected void initDelegationToken() throws IOException {
|
||||||
if (delegationToken == null) {
|
// look for hftp token, then try hdfs
|
||||||
setDelegationToken(getDelegationToken(null));
|
Token<?> token = selectHftpDelegationToken();
|
||||||
|
if (token == null) {
|
||||||
|
token = selectHdfsDelegationToken();
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we don't already have a token, go get one over https
|
||||||
|
boolean createdToken = false;
|
||||||
|
if (token == null) {
|
||||||
|
token = getDelegationToken(null);
|
||||||
|
createdToken = (token != null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// we already had a token or getDelegationToken() didn't fail.
|
||||||
|
if (token != null) {
|
||||||
|
setDelegationToken(token);
|
||||||
|
if (createdToken) {
|
||||||
dtRenewer.addRenewAction(this);
|
dtRenewer.addRenewAction(this);
|
||||||
|
LOG.debug("Created new DT for " + token.getService());
|
||||||
|
} else {
|
||||||
|
LOG.debug("Found existing DT for " + token.getService());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String normalizeService(String service) {
|
protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
|
||||||
int colonIndex = service.indexOf(':');
|
Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
|
||||||
if (colonIndex == -1) {
|
return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
|
||||||
throw new IllegalArgumentException("Invalid service for hftp token: " +
|
|
||||||
service);
|
|
||||||
}
|
|
||||||
String hostname =
|
|
||||||
NetUtils.normalizeHostName(service.substring(0, colonIndex));
|
|
||||||
String port = service.substring(colonIndex + 1);
|
|
||||||
return hostname + ":" + port;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: un-comment the following once HDFS-7510 is committed.
|
|
||||||
// protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
|
|
||||||
// Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
|
|
||||||
// return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
|
|
||||||
// }
|
|
||||||
|
|
||||||
protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
|
protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
|
||||||
return DelegationTokenSelector.selectHdfsDelegationToken(
|
return DelegationTokenSelector.selectHdfsDelegationToken(
|
||||||
nnAddr, ugi, getConf());
|
nnAddr, ugi, getConf());
|
||||||
@ -245,13 +208,17 @@ public Token<?> getRenewToken() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
||||||
renewToken = token;
|
renewToken = token;
|
||||||
// emulate the 203 usage of the tokens
|
// emulate the 203 usage of the tokens
|
||||||
// by setting the kind and service as if they were hdfs tokens
|
// by setting the kind and service as if they were hdfs tokens
|
||||||
delegationToken = new Token<T>(token);
|
delegationToken = new Token<T>(token);
|
||||||
|
// NOTE: the remote nn must be configured to use hdfs
|
||||||
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
||||||
delegationToken.setService(hdfsServiceName);
|
// no need to change service because we aren't exactly sure what it
|
||||||
|
// should be. we can guess, but it might be wrong if the local conf
|
||||||
|
// value is incorrect. the service is a client side field, so the remote
|
||||||
|
// end does not care about the value
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -262,6 +229,7 @@ public synchronized Token<?> getDelegationToken(final String renewer
|
|||||||
ugi.reloginFromKeytab();
|
ugi.reloginFromKeytab();
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||||
public Token<?> run() throws IOException {
|
public Token<?> run() throws IOException {
|
||||||
|
final String nnHttpUrl = DFSUtil.createUri("https", nnSecureAddr).toString();
|
||||||
Credentials c;
|
Credentials c;
|
||||||
try {
|
try {
|
||||||
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
|
||||||
@ -291,12 +259,7 @@ public Token<?> run() throws IOException {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public URI getUri() {
|
public URI getUri() {
|
||||||
try {
|
return hftpURI;
|
||||||
return new URI("hftp", null, nnAddr.getHostName(), nnAddr.getPort(),
|
|
||||||
null, null, null);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -722,11 +685,12 @@ public boolean isManaged(Token<?> token) throws IOException {
|
|||||||
public long renew(Token<?> token,
|
public long renew(Token<?> token,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
// update the kerberos credentials, if they are coming from a keytab
|
// update the kerberos credentials, if they are coming from a keytab
|
||||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||||
// use https to renew the token
|
// use https to renew the token
|
||||||
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||||
return
|
return
|
||||||
DelegationTokenFetcher.renewDelegationToken
|
DelegationTokenFetcher.renewDelegationToken
|
||||||
("https://" + token.getService().toString(),
|
(DFSUtil.createUri("https", serviceAddr).toString(),
|
||||||
(Token<DelegationTokenIdentifier>) token);
|
(Token<DelegationTokenIdentifier>) token);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -737,10 +701,18 @@ public void cancel(Token<?> token,
|
|||||||
// update the kerberos credentials, if they are coming from a keytab
|
// update the kerberos credentials, if they are coming from a keytab
|
||||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||||
// use https to cancel the token
|
// use https to cancel the token
|
||||||
|
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||||
DelegationTokenFetcher.cancelDelegationToken
|
DelegationTokenFetcher.cancelDelegationToken
|
||||||
("https://" + token.getService().toString(),
|
(DFSUtil.createUri("https", serviceAddr).toString(),
|
||||||
(Token<DelegationTokenIdentifier>) token);
|
(Token<DelegationTokenIdentifier>) token);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class HftpDelegationTokenSelector
|
||||||
|
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||||
|
|
||||||
|
public HftpDelegationTokenSelector() {
|
||||||
|
super(TOKEN_KIND);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
@ -120,6 +121,16 @@ private static void setupSsl(Configuration conf) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getDefaultPort() {
|
||||||
|
return getDefaultSecurePort();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
|
||||||
|
return getNamenodeAddr(uri);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected HttpURLConnection openConnection(String path, String query)
|
protected HttpURLConnection openConnection(String path, String query)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
@ -161,16 +172,6 @@ protected HttpURLConnection openConnection(String path, String query)
|
|||||||
return (HttpURLConnection) conn;
|
return (HttpURLConnection) conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public URI getUri() {
|
|
||||||
try {
|
|
||||||
return new URI("hsftp", null, nnAddr.getHostName(), nnAddr.getPort(),
|
|
||||||
null, null, null);
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dummy hostname verifier that is used to bypass hostname checking
|
* Dummy hostname verifier that is used to bypass hostname checking
|
||||||
*/
|
*/
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
@ -296,8 +297,7 @@ public static Credentials createCredentials(final NameNode namenode,
|
|||||||
}
|
}
|
||||||
|
|
||||||
final InetSocketAddress addr = namenode.getNameNodeAddress();
|
final InetSocketAddress addr = namenode.getNameNodeAddress();
|
||||||
final String s = addr.getAddress().getHostAddress() + ":" + addr.getPort();
|
SecurityUtil.setTokenService(token, addr);
|
||||||
token.setService(new Text(s));
|
|
||||||
final Credentials c = new Credentials();
|
final Credentials c = new Credentials();
|
||||||
c.addToken(new Text(ugi.getShortUserName()), token);
|
c.addToken(new Text(ugi.getShortUserName()), token);
|
||||||
return c;
|
return c;
|
||||||
|
@ -62,6 +62,7 @@
|
|||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||||
import org.apache.hadoop.security.authentication.util.KerberosName;
|
import org.apache.hadoop.security.authentication.util.KerberosName;
|
||||||
@ -492,7 +493,7 @@ public static UserGroupInformation getDefaultWebUser(Configuration conf
|
|||||||
return UserGroupInformation.createRemoteUser(strings[0]);
|
return UserGroupInformation.createRemoteUser(strings[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getNNServiceAddress(ServletContext context,
|
private static InetSocketAddress getNNServiceAddress(ServletContext context,
|
||||||
HttpServletRequest request) {
|
HttpServletRequest request) {
|
||||||
String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
|
String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
|
||||||
InetSocketAddress namenodeAddress = null;
|
InetSocketAddress namenodeAddress = null;
|
||||||
@ -503,8 +504,7 @@ private static String getNNServiceAddress(ServletContext context,
|
|||||||
context);
|
context);
|
||||||
}
|
}
|
||||||
if (namenodeAddress != null) {
|
if (namenodeAddress != null) {
|
||||||
return (namenodeAddress.getAddress().getHostAddress() + ":"
|
return namenodeAddress;
|
||||||
+ namenodeAddress.getPort());
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -547,9 +547,9 @@ public static UserGroupInformation getUGI(ServletContext context,
|
|||||||
Token<DelegationTokenIdentifier> token =
|
Token<DelegationTokenIdentifier> token =
|
||||||
new Token<DelegationTokenIdentifier>();
|
new Token<DelegationTokenIdentifier>();
|
||||||
token.decodeFromUrlString(tokenString);
|
token.decodeFromUrlString(tokenString);
|
||||||
String serviceAddress = getNNServiceAddress(context, request);
|
InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
|
||||||
if (serviceAddress != null) {
|
if (serviceAddress != null) {
|
||||||
token.setService(new Text(serviceAddress));
|
SecurityUtil.setTokenService(token, serviceAddress);
|
||||||
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
||||||
}
|
}
|
||||||
ByteArrayInputStream buf = new ByteArrayInputStream(token
|
ByteArrayInputStream buf = new ByteArrayInputStream(token
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.URLConnection;
|
import java.net.URLConnection;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
@ -49,6 +50,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
|
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -204,6 +206,7 @@ public Object run() throws Exception {
|
|||||||
static public Credentials getDTfromRemote(String nnAddr,
|
static public Credentials getDTfromRemote(String nnAddr,
|
||||||
String renewer) throws IOException {
|
String renewer) throws IOException {
|
||||||
DataInputStream dis = null;
|
DataInputStream dis = null;
|
||||||
|
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
StringBuffer url = new StringBuffer();
|
StringBuffer url = new StringBuffer();
|
||||||
@ -229,9 +232,7 @@ static public Credentials getDTfromRemote(String nnAddr,
|
|||||||
ts.readFields(dis);
|
ts.readFields(dis);
|
||||||
for(Token<?> token: ts.getAllTokens()) {
|
for(Token<?> token: ts.getAllTokens()) {
|
||||||
token.setKind(HftpFileSystem.TOKEN_KIND);
|
token.setKind(HftpFileSystem.TOKEN_KIND);
|
||||||
token.setService(new Text(SecurityUtil.buildDTServiceName
|
SecurityUtil.setTokenService(token, serviceAddr);
|
||||||
(remoteURL.toURI(),
|
|
||||||
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT)));
|
|
||||||
}
|
}
|
||||||
return ts;
|
return ts;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -882,6 +882,8 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
|
|||||||
if(dn == null)
|
if(dn == null)
|
||||||
throw new IOException("Cannot start DataNode in "
|
throw new IOException("Cannot start DataNode in "
|
||||||
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
|
+ dnConf.get(DFS_DATANODE_DATA_DIR_KEY));
|
||||||
|
//NOTE: the following is true if and only if:
|
||||||
|
// hadoop.security.token.service.use_ip=true
|
||||||
//since the HDFS does things based on IP:port, we need to add the mapping
|
//since the HDFS does things based on IP:port, we need to add the mapping
|
||||||
//for IP:port to rackId
|
//for IP:port to rackId
|
||||||
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
|
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
@ -232,4 +233,164 @@ public void testSeek() throws IOException {
|
|||||||
in.seek(7);
|
in.seek(7);
|
||||||
assertEquals('7', in.read());
|
assertEquals('7', in.read());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void resetFileSystem() throws IOException {
|
||||||
|
// filesystem caching has a quirk/bug that it caches based on the user's
|
||||||
|
// given uri. the result is if a filesystem is instantiated with no port,
|
||||||
|
// it gets the default port. then if the default port is changed,
|
||||||
|
// and another filesystem is instantiated with no port, the prior fs
|
||||||
|
// is returned, not a new one using the changed port. so let's flush
|
||||||
|
// the cache between tests...
|
||||||
|
FileSystem.closeAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHftpDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI uri = URI.create("hftp://localhost");
|
||||||
|
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHftpCustomDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("dfs.http.port", 123);
|
||||||
|
conf.setInt("dfs.https.port", 456);
|
||||||
|
|
||||||
|
URI uri = URI.create("hftp://localhost");
|
||||||
|
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(123, fs.getDefaultPort());
|
||||||
|
assertEquals(456, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:456",
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI uri = URI.create("hftp://localhost:123");
|
||||||
|
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("dfs.http.port", 123);
|
||||||
|
conf.setInt("dfs.https.port", 456);
|
||||||
|
|
||||||
|
URI uri = URI.create("hftp://localhost:789");
|
||||||
|
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(123, fs.getDefaultPort());
|
||||||
|
assertEquals(456, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:456",
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsftpDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI uri = URI.create("hsftp://localhost");
|
||||||
|
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsftpCustomDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("dfs.http.port", 123);
|
||||||
|
conf.setInt("dfs.https.port", 456);
|
||||||
|
|
||||||
|
URI uri = URI.create("hsftp://localhost");
|
||||||
|
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(456, fs.getDefaultPort());
|
||||||
|
assertEquals(456, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:456",
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI uri = URI.create("hsftp://localhost:123");
|
||||||
|
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
|
||||||
|
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:123",
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
|
||||||
|
resetFileSystem();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt("dfs.http.port", 123);
|
||||||
|
conf.setInt("dfs.https.port", 456);
|
||||||
|
|
||||||
|
URI uri = URI.create("hsftp://localhost:789");
|
||||||
|
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
|
||||||
|
|
||||||
|
assertEquals(456, fs.getDefaultPort());
|
||||||
|
assertEquals(456, fs.getDefaultSecurePort());
|
||||||
|
|
||||||
|
assertEquals(uri, fs.getUri());
|
||||||
|
assertEquals(
|
||||||
|
"127.0.0.1:789",
|
||||||
|
fs.getCanonicalServiceName()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
<!-- Turn off SSL server authentication for tests by default -->
|
||||||
|
<property>
|
||||||
|
<name>ssl.client.do.not.authenticate.server</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue
Block a user