HDFS-5255. Distcp job fails with hsftp when https is enabled in insecure cluster.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1528279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-02 00:03:15 +00:00
parent 4e9c652c52
commit b99d160437
7 changed files with 79 additions and 63 deletions

View File

@ -382,6 +382,9 @@ Release 2.1.2 - UNRELEASED
HDFS-5265. Namenode fails to start when dfs.https.port is unspecified. HDFS-5265. Namenode fails to start when dfs.https.port is unspecified.
(Haohui Mai via jing9) (Haohui Mai via jing9)
HDFS-5255. Distcp job fails with hsftp when https is enabled in insecure
cluster. (Arpit Agarwal)
Release 2.1.1-beta - 2013-09-23 Release 2.1.1-beta - 2013-09-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -94,7 +94,6 @@ public class HftpFileSystem extends FileSystem
private URI hftpURI; private URI hftpURI;
protected URI nnUri; protected URI nnUri;
protected URI nnSecureUri;
public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
@ -134,34 +133,33 @@ protected int getDefaultPort() {
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT); DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
} }
protected int getDefaultSecurePort() { /**
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, * We generate the address with one of the following ports, in
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT); * order of preference.
} * 1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000.
* 2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY
* 3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070.
*
* @param uri
* @return
*/
protected InetSocketAddress getNamenodeAddr(URI uri) { protected InetSocketAddress getNamenodeAddr(URI uri) {
// use authority so user supplied uri can override port // use authority so user supplied uri can override port
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort()); return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
} }
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
// must only use the host and the configured https port
return NetUtils.createSocketAddrForHost(uri.getHost(), getDefaultSecurePort());
}
protected URI getNamenodeUri(URI uri) { protected URI getNamenodeUri(URI uri) {
return DFSUtil.createUri("http", getNamenodeAddr(uri)); return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri));
}
protected URI getNamenodeSecureUri(URI uri) {
return DFSUtil.createUri("http", getNamenodeSecureAddr(uri));
} }
/**
* See the documentation of {@Link #getNamenodeAddr(URI)} for the logic
* behind selecting the canonical service name.
* @return
*/
@Override @Override
public String getCanonicalServiceName() { public String getCanonicalServiceName() {
// unlike other filesystems, hftp's service is the secure port, not the return SecurityUtil.buildTokenService(nnUri).toString();
// actual port in the uri
return SecurityUtil.buildTokenService(nnSecureUri).toString();
} }
@Override @Override
@ -187,7 +185,6 @@ public void initialize(final URI name, final Configuration conf)
setConf(conf); setConf(conf);
this.ugi = UserGroupInformation.getCurrentUser(); this.ugi = UserGroupInformation.getCurrentUser();
this.nnUri = getNamenodeUri(name); this.nnUri = getNamenodeUri(name);
this.nnSecureUri = getNamenodeSecureUri(name);
try { try {
this.hftpURI = new URI(name.getScheme(), name.getAuthority(), this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
null, null, null); null, null, null);
@ -225,7 +222,7 @@ protected void initDelegationToken() throws IOException {
protected Token<DelegationTokenIdentifier> selectDelegationToken( protected Token<DelegationTokenIdentifier> selectDelegationToken(
UserGroupInformation ugi) { UserGroupInformation ugi) {
return hftpTokenSelector.selectToken(nnSecureUri, ugi.getTokens(), getConf()); return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
} }
@ -234,6 +231,13 @@ public Token<?> getRenewToken() {
return renewToken; return renewToken;
} }
/**
* Return the underlying protocol that is used to talk to the namenode.
*/
protected String getUnderlyingProtocol() {
return "http";
}
@Override @Override
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) { public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
renewToken = token; renewToken = token;
@ -257,7 +261,7 @@ public synchronized Token<?> getDelegationToken(final String renewer
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() { return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override @Override
public Token<?> run() throws IOException { public Token<?> run() throws IOException {
final String nnHttpUrl = nnSecureUri.toString(); final String nnHttpUrl = nnUri.toString();
Credentials c; Credentials c;
try { try {
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer); c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
@ -301,7 +305,7 @@ public URI getUri() {
* @throws IOException on error constructing the URL * @throws IOException on error constructing the URL
*/ */
protected URL getNamenodeURL(String path, String query) throws IOException { protected URL getNamenodeURL(String path, String query) throws IOException {
final URL url = new URL("http", nnUri.getHost(), final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
nnUri.getPort(), path + '?' + query); nnUri.getPort(), path + '?' + query);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("url=" + url); LOG.trace("url=" + url);
@ -703,17 +707,20 @@ public boolean isManaged(Token<?> token) throws IOException {
return true; return true;
} }
protected String getUnderlyingProtocol() {
return "http";
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public long renew(Token<?> token, public long renew(Token<?> token,
Configuration conf) throws IOException { Configuration conf) throws IOException {
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
// use http to renew the token
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
return return
DelegationTokenFetcher.renewDelegationToken DelegationTokenFetcher.renewDelegationToken
(DFSUtil.createUri("http", serviceAddr).toString(), (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token); (Token<DelegationTokenIdentifier>) token);
} }
@ -723,10 +730,9 @@ public void cancel(Token<?> token,
Configuration conf) throws IOException { Configuration conf) throws IOException {
// update the kerberos credentials, if they are coming from a keytab // update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
// use http to cancel the token
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token); InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
DelegationTokenFetcher.cancelDelegationToken DelegationTokenFetcher.cancelDelegationToken
(DFSUtil.createUri("http", serviceAddr).toString(), (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token); (Token<DelegationTokenIdentifier>) token);
} }
} }

View File

@ -68,6 +68,14 @@ public String getScheme() {
return "hsftp"; return "hsftp";
} }
/**
* Return the underlying protocol that is used to talk to the namenode.
*/
@Override
protected String getUnderlyingProtocol() {
return "https";
}
@Override @Override
public void initialize(URI name, Configuration conf) throws IOException { public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf); super.initialize(name, conf);
@ -134,24 +142,15 @@ private static void setupSsl(Configuration conf) throws IOException {
@Override @Override
protected int getDefaultPort() { protected int getDefaultPort() {
return getDefaultSecurePort(); return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
} }
@Override
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
return getNamenodeAddr(uri);
}
@Override
protected URI getNamenodeUri(URI uri) {
return getNamenodeSecureUri(uri);
}
@Override @Override
protected HttpURLConnection openConnection(String path, String query) protected HttpURLConnection openConnection(String path, String query)
throws IOException { throws IOException {
query = addDelegationTokenParam(query); query = addDelegationTokenParam(query);
final URL url = new URL("https", nnUri.getHost(), final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
nnUri.getPort(), path + '?' + query); nnUri.getPort(), path + '?' + query);
HttpsURLConnection conn; HttpsURLConnection conn;
conn = (HttpsURLConnection)connectionFactory.openConnection(url); conn = (HttpsURLConnection)connectionFactory.openConnection(url);

View File

@ -57,9 +57,14 @@ private URL createRedirectURL(UserGroupInformation ugi, DatanodeID host,
final String hostname = host instanceof DatanodeInfo final String hostname = host instanceof DatanodeInfo
? ((DatanodeInfo)host).getHostName() : host.getIpAddr(); ? ((DatanodeInfo)host).getHostName() : host.getIpAddr();
final String scheme = request.getScheme(); final String scheme = request.getScheme();
final int port = "https".equals(scheme) int port = host.getInfoPort();
? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY) if ("https".equals(scheme)) {
: host.getInfoPort(); final Integer portObject = (Integer) getServletContext().getAttribute(
DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
if (portObject != null) {
port = portObject;
}
}
final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum"); final String encodedPath = ServletUtil.getRawPath(request, "/fileChecksum");
String dtParam = ""; String dtParam = "";

View File

@ -61,9 +61,14 @@ private URL createRedirectURL(String path, String encodedPath, HdfsFileStatus st
} else { } else {
hostname = host.getIpAddr(); hostname = host.getIpAddr();
} }
final int port = "https".equals(scheme) int port = host.getInfoPort();
? (Integer)getServletContext().getAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY) if ("https".equals(scheme)) {
: host.getInfoPort(); final Integer portObject = (Integer) getServletContext().getAttribute(
DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY);
if (portObject != null) {
port = portObject;
}
}
String dtParam = ""; String dtParam = "";
if (dt != null) { if (dt != null) {

View File

@ -88,19 +88,21 @@ public void testSelectHftpDelegationToken() throws Exception {
URI fsUri = URI.create("hftp://localhost"); URI fsUri = URI.create("hftp://localhost");
MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort()); assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf); // should still use secure port checkTokenSelection(fs, httpPort, conf);
// test with explicit default port // test with explicit default port
// Make sure it uses the port from the hftp URI.
fsUri = URI.create("hftp://localhost:"+httpPort); fsUri = URI.create("hftp://localhost:"+httpPort);
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort, fs.getCanonicalUri().getPort()); assertEquals(httpPort, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf); // should still use secure port checkTokenSelection(fs, httpPort, conf);
// test with non-default port // test with non-default port
// Make sure it uses the port from the hftp URI.
fsUri = URI.create("hftp://localhost:"+(httpPort+1)); fsUri = URI.create("hftp://localhost:"+(httpPort+1));
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf); fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
assertEquals(httpPort+1, fs.getCanonicalUri().getPort()); assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
checkTokenSelection(fs, httpsPort, conf); // should still use secure port checkTokenSelection(fs, httpPort + 1, conf);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
} }
@ -178,7 +180,7 @@ public void run() {
} }
assertNotNull(ex); assertNotNull(ex);
assertNotNull(ex.getCause()); assertNotNull(ex.getCause());
assertEquals("Unexpected end of file from server", assertEquals("Remote host closed connection during handshake",
ex.getCause().getMessage()); ex.getCause().getMessage());
} finally { } finally {
t.interrupt(); t.interrupt();

View File

@ -294,11 +294,13 @@ public void testHftpDefaultPorts() throws IOException {
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
// HFTP uses http to get the token so canonical service name should
// return the http port.
assertEquals( assertEquals(
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, "127.0.0.1:" + DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT,
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
); );
} }
@ -307,17 +309,18 @@ public void testHftpDefaultPorts() throws IOException {
public void testHftpCustomDefaultPorts() throws IOException { public void testHftpCustomDefaultPorts() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 123); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 123);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 456);
URI uri = URI.create("hftp://localhost"); URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort()); assertEquals(123, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
// HFTP uses http to get the token so canonical service name should
// return the http port.
assertEquals( assertEquals(
"127.0.0.1:456", "127.0.0.1:123",
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
); );
} }
@ -329,11 +332,10 @@ public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, "127.0.0.1:123",
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
); );
} }
@ -342,17 +344,15 @@ public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException { public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 123); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, 123);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 456);
URI uri = URI.create("hftp://localhost:789"); URI uri = URI.create("hftp://localhost:789");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf); HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort()); assertEquals(123, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
"127.0.0.1:456", "127.0.0.1:789",
fs.getCanonicalServiceName() fs.getCanonicalServiceName()
); );
} }
@ -366,7 +366,6 @@ public void testHsftpDefaultPorts() throws IOException {
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
@ -385,7 +384,6 @@ public void testHsftpCustomDefaultPorts() throws IOException {
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(456, fs.getDefaultPort()); assertEquals(456, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
@ -401,7 +399,6 @@ public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort()); assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(
@ -420,7 +417,6 @@ public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf); HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(456, fs.getDefaultPort()); assertEquals(456, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri()); assertEquals(uri, fs.getUri());
assertEquals( assertEquals(