HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem to the TokenAspect class. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541665 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c591aa442
commit
1c211f6749
@ -477,6 +477,9 @@ Release 2.3.0 - UNRELEASED
|
||||
HDFS-5495. Remove further JUnit3 usages from HDFS.
|
||||
(Jarek Jarcec Cecho via wang)
|
||||
|
||||
HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem
|
||||
to the TokenAspect class. (Haohui Mai via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
|
||||
|
@ -31,7 +31,6 @@
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -50,10 +49,8 @@
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
|
||||
import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
@ -62,8 +59,6 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.ServletUtil;
|
||||
import org.xml.sax.Attributes;
|
||||
@ -83,7 +78,9 @@
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class HftpFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable {
|
||||
implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
|
||||
public static final String SCHEME = "hftp";
|
||||
|
||||
static {
|
||||
HttpURLConnection.setFollowRedirects(true);
|
||||
}
|
||||
@ -100,19 +97,13 @@ public class HftpFileSystem extends FileSystem
|
||||
public static final String HFTP_TIMEZONE = "UTC";
|
||||
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||
|
||||
private TokenAspect<HftpFileSystem> tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
|
||||
private Token<?> delegationToken;
|
||||
private Token<?> renewToken;
|
||||
private static final HftpDelegationTokenSelector hftpTokenSelector =
|
||||
new HftpDelegationTokenSelector();
|
||||
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
|
||||
private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
|
||||
if (dtRenewer == null) {
|
||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
||||
dtRenewer.addRenewAction(hftpFs);
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
return super.getCanonicalUri();
|
||||
}
|
||||
|
||||
public static final SimpleDateFormat getDateFormat() {
|
||||
@ -177,7 +168,7 @@ protected URI canonicalizeUri(URI uri) {
|
||||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return "hftp";
|
||||
return SCHEME;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -195,39 +186,10 @@ public void initialize(final URI name, final Configuration conf)
|
||||
}
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
initDelegationToken();
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
}
|
||||
}
|
||||
|
||||
protected void initDelegationToken() throws IOException {
|
||||
// look for hftp token, then try hdfs
|
||||
Token<?> token = selectDelegationToken(ugi);
|
||||
|
||||
// if we don't already have a token, go get one over https
|
||||
boolean createdToken = false;
|
||||
if (token == null) {
|
||||
token = getDelegationToken(null);
|
||||
createdToken = (token != null);
|
||||
}
|
||||
|
||||
// we already had a token or getDelegationToken() didn't fail.
|
||||
if (token != null) {
|
||||
setDelegationToken(token);
|
||||
if (createdToken) {
|
||||
addRenewAction(this);
|
||||
LOG.debug("Created new DT for " + token.getService());
|
||||
} else {
|
||||
LOG.debug("Found existing DT for " + token.getService());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||
UserGroupInformation ugi) {
|
||||
return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Token<?> getRenewToken() {
|
||||
return renewToken;
|
||||
@ -242,16 +204,19 @@ protected String getUnderlyingProtocol() {
|
||||
|
||||
@Override
|
||||
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
||||
/**
|
||||
* XXX The kind of the token has been changed by DelegationTokenFetcher. We
|
||||
* use the token for renewal, since the reflection utilities needs the value
|
||||
* of the kind field to correctly renew the token.
|
||||
*
|
||||
* For other operations, however, the client has to send a
|
||||
* HDFS_DELEGATION_KIND token over the wire so that it can talk to Hadoop
|
||||
* 0.20.3 clusters. Later releases fix this problem. See HDFS-5440 for more
|
||||
* details.
|
||||
*/
|
||||
renewToken = token;
|
||||
// emulate the 203 usage of the tokens
|
||||
// by setting the kind and service as if they were hdfs tokens
|
||||
delegationToken = new Token<T>(token);
|
||||
// NOTE: the remote nn must be configured to use hdfs
|
||||
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
||||
// no need to change service because we aren't exactly sure what it
|
||||
// should be. we can guess, but it might be wrong if the local conf
|
||||
// value is incorrect. the service is a client side field, so the remote
|
||||
// end does not care about the value
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -350,6 +315,7 @@ protected String addDelegationTokenParam(String query) throws IOException {
|
||||
String tokenString = null;
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
synchronized (this) {
|
||||
tokenAspect.ensureTokenInitialized();
|
||||
if (delegationToken != null) {
|
||||
tokenString = delegationToken.encodeToUrlString();
|
||||
return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
|
||||
@ -419,9 +385,7 @@ public FSDataInputStream open(Path f, int buffersize) throws IOException {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (dtRenewer != null) {
|
||||
dtRenewer.removeRenewAction(this); // blocks
|
||||
}
|
||||
tokenAspect.removeRenewAction();
|
||||
}
|
||||
|
||||
/** Class to parse and store a listing reply from the server. */
|
||||
@ -696,67 +660,26 @@ public ContentSummary getContentSummary(Path f) throws IOException {
|
||||
return cs != null? cs: super.getContentSummary(f);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public static class TokenManager extends TokenRenewer {
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
protected String getUnderlyingProtocol() {
|
||||
return "http";
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public long renew(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
return
|
||||
DelegationTokenFetcher.renewDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void cancel(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
DelegationTokenFetcher.cancelDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public long renewDelegationToken(Token<?> token) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
return
|
||||
DelegationTokenFetcher.renewDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
}
|
||||
|
||||
private static class HftpDelegationTokenSelector
|
||||
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
private static final DelegationTokenSelector hdfsTokenSelector =
|
||||
new DelegationTokenSelector();
|
||||
|
||||
public HftpDelegationTokenSelector() {
|
||||
super(TOKEN_KIND);
|
||||
}
|
||||
|
||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
||||
Collection<Token<?>> tokens, Configuration conf) {
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
|
||||
if (token == null) {
|
||||
// try to get a HDFS token
|
||||
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void cancelDelegationToken(Token<?> token) throws IOException {
|
||||
// update the kerberos credentials, if they are coming from a keytab
|
||||
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
|
||||
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
DelegationTokenFetcher.cancelDelegationToken
|
||||
(DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
|
||||
(Token<DelegationTokenIdentifier>) token);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,177 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This class implements the aspects that relate to delegation tokens for all
|
||||
* HTTP-based file system.
|
||||
*/
|
||||
final class TokenAspect<T extends FileSystem & Renewable> {
|
||||
@InterfaceAudience.Private
|
||||
public static class TokenManager extends TokenRenewer {
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
getInstance(token, conf).cancelDelegationToken(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(HftpFileSystem.TOKEN_KIND)
|
||||
|| kind.equals(WebHdfsFileSystem.TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
return getInstance(token, conf).renewDelegationToken(token);
|
||||
}
|
||||
|
||||
private TokenManagementDelegator getInstance(Token<?> token,
|
||||
Configuration conf) throws IOException {
|
||||
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token);
|
||||
Text kind = token.getKind();
|
||||
final URI uri;
|
||||
if (kind.equals(HftpFileSystem.TOKEN_KIND)) {
|
||||
uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address);
|
||||
} else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
|
||||
uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported scheme");
|
||||
}
|
||||
return (TokenManagementDelegator) FileSystem.get(uri, conf);
|
||||
}
|
||||
}
|
||||
|
||||
static class DTSelecorByKind extends
|
||||
AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
private static final DelegationTokenSelector selector = new DelegationTokenSelector();
|
||||
|
||||
public DTSelecorByKind(final Text kind) {
|
||||
super(kind);
|
||||
}
|
||||
|
||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
||||
Collection<Token<?>> tokens, Configuration conf) {
|
||||
Token<DelegationTokenIdentifier> token = selectToken(
|
||||
SecurityUtil.buildTokenService(nnUri), tokens);
|
||||
if (token == null) {
|
||||
token = selector.selectToken(nnUri, tokens, conf);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callbacks for token management
|
||||
*/
|
||||
interface TokenManagementDelegator {
|
||||
void cancelDelegationToken(final Token<?> token) throws IOException;
|
||||
|
||||
URI getCanonicalUri();
|
||||
|
||||
long renewDelegationToken(final Token<?> token) throws IOException;
|
||||
}
|
||||
|
||||
private DelegationTokenRenewer.RenewAction<?> action;
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
private final DTSelecorByKind dtSelector;
|
||||
private final T fs;
|
||||
private boolean hasInitedToken;
|
||||
private final Log LOG;
|
||||
|
||||
TokenAspect(T fs, final Text kind) {
|
||||
this.LOG = LogFactory.getLog(fs.getClass());
|
||||
this.fs = fs;
|
||||
this.dtSelector = new DTSelecorByKind(kind);
|
||||
}
|
||||
|
||||
synchronized void ensureTokenInitialized() throws IOException {
|
||||
// we haven't inited yet, or we used to have a token but it expired
|
||||
if (!hasInitedToken || (action != null && !action.isValid())) {
|
||||
//since we don't already have a token, go get one
|
||||
Token<?> token = fs.getDelegationToken(null);
|
||||
// security might be disabled
|
||||
if (token != null) {
|
||||
fs.setDelegationToken(token);
|
||||
addRenewAction(fs);
|
||||
LOG.debug("Created new DT for " + token.getService());
|
||||
}
|
||||
hasInitedToken = true;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void initDelegationToken(UserGroupInformation ugi) {
|
||||
Token<?> token = selectDelegationToken(ugi);
|
||||
if (token != null) {
|
||||
LOG.debug("Found existing DT for " + token.getService());
|
||||
fs.setDelegationToken(token);
|
||||
hasInitedToken = true;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void removeRenewAction() throws IOException {
|
||||
if (dtRenewer != null) {
|
||||
dtRenewer.removeRenewAction(fs);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Token<DelegationTokenIdentifier> selectDelegationToken(
|
||||
UserGroupInformation ugi) {
|
||||
return dtSelector.selectToken(
|
||||
((TokenManagementDelegator)fs).getCanonicalUri(), ugi.getTokens(),
|
||||
fs.getConf());
|
||||
}
|
||||
|
||||
private synchronized void addRenewAction(final T webhdfs) {
|
||||
if (dtRenewer == null) {
|
||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
||||
action = dtRenewer.addRenewAction(webhdfs);
|
||||
}
|
||||
}
|
@ -30,7 +30,6 @@
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
@ -56,8 +55,8 @@
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
|
||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||
import org.apache.hadoop.hdfs.web.TokenAspect.DTSelecorByKind;
|
||||
import org.apache.hadoop.hdfs.web.resources.AccessTimeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
@ -96,8 +95,6 @@
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
@ -107,7 +104,7 @@
|
||||
|
||||
/** A FileSystem for HDFS over the web. */
|
||||
public class WebHdfsFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable {
|
||||
implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
|
||||
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
||||
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
||||
public static final String SCHEME = "webhdfs";
|
||||
@ -122,13 +119,18 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
/** Delegation token kind */
|
||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||
/** Token selector */
|
||||
public static final WebHdfsDelegationTokenSelector DT_SELECTOR
|
||||
= new WebHdfsDelegationTokenSelector();
|
||||
public static final DTSelecorByKind DT_SELECTOR
|
||||
= new DTSelecorByKind(TOKEN_KIND);
|
||||
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
@VisibleForTesting
|
||||
DelegationTokenRenewer.RenewAction<?> action;
|
||||
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
return super.getCanonicalUri();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
if (dtRenewer == null) {
|
||||
@ -142,7 +144,6 @@ protected synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
public static boolean isEnabled(final Configuration conf, final Log log) {
|
||||
final boolean b = conf.getBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT);
|
||||
log.info(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY + " = " + b);
|
||||
return b;
|
||||
}
|
||||
|
||||
@ -986,7 +987,8 @@ public <T extends TokenIdentifier> void setDelegationToken(
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized long renewDelegationToken(final Token<?> token
|
||||
@Override
|
||||
public synchronized long renewDelegationToken(final Token<?> token
|
||||
) throws IOException {
|
||||
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
|
||||
TokenArgumentParam dtargParam = new TokenArgumentParam(
|
||||
@ -995,7 +997,8 @@ private synchronized long renewDelegationToken(final Token<?> token
|
||||
return (Long) m.get("long");
|
||||
}
|
||||
|
||||
private synchronized void cancelDelegationToken(final Token<?> token
|
||||
@Override
|
||||
public synchronized void cancelDelegationToken(final Token<?> token
|
||||
) throws IOException {
|
||||
final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
|
||||
TokenArgumentParam dtargParam = new TokenArgumentParam(
|
||||
@ -1041,57 +1044,4 @@ public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
|
||||
final Map<?, ?> m = run(op, p);
|
||||
return JsonUtil.toMD5MD5CRC32FileChecksum(m);
|
||||
}
|
||||
|
||||
/** Delegation token renewer. */
|
||||
public static class DtRenewer extends TokenRenewer {
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
private static WebHdfsFileSystem getWebHdfs(
|
||||
final Token<?> token, final Configuration conf) throws IOException {
|
||||
|
||||
final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
|
||||
final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
|
||||
return (WebHdfsFileSystem)FileSystem.get(uri, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(final Token<?> token, final Configuration conf
|
||||
) throws IOException, InterruptedException {
|
||||
return getWebHdfs(token, conf).renewDelegationToken(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(final Token<?> token, final Configuration conf
|
||||
) throws IOException, InterruptedException {
|
||||
getWebHdfs(token, conf).cancelDelegationToken(token);
|
||||
}
|
||||
}
|
||||
|
||||
private static class WebHdfsDelegationTokenSelector
|
||||
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
|
||||
private static final DelegationTokenSelector hdfsTokenSelector =
|
||||
new DelegationTokenSelector();
|
||||
|
||||
public WebHdfsDelegationTokenSelector() {
|
||||
super(TOKEN_KIND);
|
||||
}
|
||||
|
||||
Token<DelegationTokenIdentifier> selectToken(URI nnUri,
|
||||
Collection<Token<?>> tokens, Configuration conf) {
|
||||
Token<DelegationTokenIdentifier> token =
|
||||
selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
|
||||
if (token == null) {
|
||||
token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,5 +13,4 @@
|
||||
#
|
||||
org.apache.hadoop.hdfs.DFSClient$Renewer
|
||||
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
|
||||
org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
|
||||
org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer
|
||||
org.apache.hadoop.hdfs.web.TokenAspect$TokenManager
|
||||
|
@ -22,7 +22,6 @@
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
import java.net.URI;
|
||||
@ -40,6 +39,7 @@
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
public class TestHftpDelegationToken {
|
||||
|
||||
@ -71,9 +71,8 @@ public FileSystem run() throws Exception {
|
||||
});
|
||||
assertSame("wrong kind of file system", HftpFileSystem.class,
|
||||
fs.getClass());
|
||||
Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken");
|
||||
renewToken.setAccessible(true);
|
||||
assertSame("wrong token", token, renewToken.get(fs));
|
||||
assertSame("wrong token", token,
|
||||
Whitebox.getInternalState(fs, "renewToken"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -81,7 +80,7 @@ public void testSelectHftpDelegationToken() throws Exception {
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
|
||||
conf.setClass("fs.hftp.impl", HftpFileSystem.class, FileSystem.class);
|
||||
|
||||
int httpPort = 80;
|
||||
int httpsPort = 443;
|
||||
@ -90,21 +89,21 @@ public void testSelectHftpDelegationToken() throws Exception {
|
||||
|
||||
// test with implicit default port
|
||||
URI fsUri = URI.create("hftp://localhost");
|
||||
MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
HftpFileSystem fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpPort, conf);
|
||||
|
||||
// test with explicit default port
|
||||
// Make sure it uses the port from the hftp URI.
|
||||
fsUri = URI.create("hftp://localhost:"+httpPort);
|
||||
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpPort, conf);
|
||||
|
||||
// test with non-default port
|
||||
// Make sure it uses the port from the hftp URI.
|
||||
fsUri = URI.create("hftp://localhost:"+(httpPort+1));
|
||||
fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
fs = (HftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpPort + 1, conf);
|
||||
|
||||
@ -116,7 +115,7 @@ public void testSelectHsftpDelegationToken() throws Exception {
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class);
|
||||
conf.setClass("fs.hsftp.impl", HsftpFileSystem.class, FileSystem.class);
|
||||
|
||||
int httpPort = 80;
|
||||
int httpsPort = 443;
|
||||
@ -125,19 +124,19 @@ public void testSelectHsftpDelegationToken() throws Exception {
|
||||
|
||||
// test with implicit default port
|
||||
URI fsUri = URI.create("hsftp://localhost");
|
||||
MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort, conf);
|
||||
|
||||
// test with explicit default port
|
||||
fsUri = URI.create("hsftp://localhost:"+httpsPort);
|
||||
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort, conf);
|
||||
|
||||
// test with non-default port
|
||||
fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
|
||||
fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
fs = (HsftpFileSystem) FileSystem.newInstance(fsUri, conf);
|
||||
assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
|
||||
checkTokenSelection(fs, httpsPort+1, conf);
|
||||
|
||||
@ -197,6 +196,9 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
UserGroupInformation ugi =
|
||||
UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
TokenAspect<HftpFileSystem> aspect = (TokenAspect<HftpFileSystem>) Whitebox.getInternalState(fs, "tokenAspect");
|
||||
|
||||
// use ip-based tokens
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(true);
|
||||
|
||||
@ -208,7 +210,7 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
ugi.addToken(hdfsToken);
|
||||
|
||||
// test fallback to hdfs token
|
||||
Token<?> token = fs.selectDelegationToken(ugi);
|
||||
Token<?> token = aspect.selectDelegationToken(ugi);
|
||||
assertNotNull(token);
|
||||
assertEquals(hdfsToken, token);
|
||||
|
||||
@ -217,13 +219,13 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
new byte[0], new byte[0],
|
||||
HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
|
||||
ugi.addToken(hftpToken);
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertNotNull(token);
|
||||
assertEquals(hftpToken, token);
|
||||
|
||||
// switch to using host-based tokens, no token should match
|
||||
SecurityUtilTestHelper.setTokenServiceUseIp(false);
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertNull(token);
|
||||
|
||||
// test fallback to hdfs token
|
||||
@ -232,7 +234,7 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
|
||||
new Text("localhost:8020"));
|
||||
ugi.addToken(hdfsToken);
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertNotNull(token);
|
||||
assertEquals(hdfsToken, token);
|
||||
|
||||
@ -241,36 +243,8 @@ private void checkTokenSelection(HftpFileSystem fs,
|
||||
new byte[0], new byte[0],
|
||||
HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
|
||||
ugi.addToken(hftpToken);
|
||||
token = fs.selectDelegationToken(ugi);
|
||||
token = aspect.selectDelegationToken(ugi);
|
||||
assertNotNull(token);
|
||||
assertEquals(hftpToken, token);
|
||||
}
|
||||
|
||||
static class MyHftpFileSystem extends HftpFileSystem {
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
return super.getCanonicalUri();
|
||||
}
|
||||
@Override
|
||||
public int getDefaultPort() {
|
||||
return super.getDefaultPort();
|
||||
}
|
||||
// don't automatically get a token
|
||||
@Override
|
||||
protected void initDelegationToken() throws IOException {}
|
||||
}
|
||||
|
||||
static class MyHsftpFileSystem extends HsftpFileSystem {
|
||||
@Override
|
||||
public URI getCanonicalUri() {
|
||||
return super.getCanonicalUri();
|
||||
}
|
||||
@Override
|
||||
public int getDefaultPort() {
|
||||
return super.getDefaultPort();
|
||||
}
|
||||
// don't automatically get a token
|
||||
@Override
|
||||
protected void initDelegationToken() throws IOException {}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user