HDFS-2528. Webhdfs: set delegation kind to WEBHDFS and add a HDFS token when http requests are redirected to datanode.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1198903 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-11-07 20:05:16 +00:00
parent 6733a1ca5e
commit a590b498ac
8 changed files with 72 additions and 61 deletions

View File

@ -120,6 +120,9 @@ Release 0.23.1 - UNRELEASED
isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in
HdfsFileStatus JSON object. (szetszwo) HdfsFileStatus JSON object. (szetszwo)
HDFS-2528. Webhdfs: set delegation kind to WEBHDFS and add a HDFS token
when http requests are redirected to datanode. (szetszwo)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -134,7 +134,7 @@ public String toString() {
private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>(); private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
public DelegationTokenRenewer(final Class<?> clazz) { public DelegationTokenRenewer(final Class<T> clazz) {
super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName()); super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
setDaemon(true); setDaemon(true);
} }

View File

@ -58,13 +58,12 @@
import org.apache.hadoop.hdfs.web.resources.DelegationParam; import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.http.HtmlQuoting; import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
@ -546,9 +545,8 @@ public static UserGroupInformation getUGI(ServletContext context,
token.decodeFromUrlString(tokenString); token.decodeFromUrlString(tokenString);
String serviceAddress = getNNServiceAddress(context, request); String serviceAddress = getNNServiceAddress(context, request);
if (serviceAddress != null) { if (serviceAddress != null) {
LOG.info("Setting service in token: "
+ new Text(serviceAddress));
token.setService(new Text(serviceAddress)); token.setService(new Text(serviceAddress));
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
} }
ByteArrayInputStream buf = new ByteArrayInputStream(token ByteArrayInputStream buf = new ByteArrayInputStream(token
.getIdentifier()); .getIdentifier());

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.web.JsonUtil; import org.apache.hadoop.hdfs.web.JsonUtil;
@ -58,7 +59,9 @@
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam; import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam; import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam; import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.hdfs.web.resources.LengthParam; import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam; import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
@ -69,7 +72,9 @@
import org.apache.hadoop.hdfs.web.resources.ReplicationParam; import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam; import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.sun.jersey.spi.container.ResourceFilters; import com.sun.jersey.spi.container.ResourceFilters;
@ -84,6 +89,29 @@ public class DatanodeWebHdfsMethods {
private @Context ServletContext context; private @Context ServletContext context;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
private void init(final UserGroupInformation ugi, final DelegationParam delegation,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + Param.toSortedString(", ", parameters));
}
//clear content type
response.setContentType(null);
if (UserGroupInformation.isSecurityEnabled()) {
//add a token for RPC.
final DataNode datanode = (DataNode)context.getAttribute("datanode");
final InetSocketAddress nnRpcAddr = NameNode.getAddress(datanode.getConf());
final Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(delegation.getValue());
SecurityUtil.setTokenService(token, nnRpcAddr);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
ugi.addToken(token);
}
}
/** Handle HTTP PUT request for the root. */ /** Handle HTTP PUT request for the root. */
@PUT @PUT
@Path("/") @Path("/")
@ -92,6 +120,8 @@ public class DatanodeWebHdfsMethods {
public Response putRoot( public Response putRoot(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op, final PutOpParam op,
@QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT) @QueryParam(PermissionParam.NAME) @DefaultValue(PermissionParam.DEFAULT)
@ -105,7 +135,7 @@ public Response putRoot(
@QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT) @QueryParam(BlockSizeParam.NAME) @DefaultValue(BlockSizeParam.DEFAULT)
final BlockSizeParam blockSize final BlockSizeParam blockSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return put(in, ugi, ROOT, op, permission, overwrite, bufferSize, return put(in, ugi, delegation, ROOT, op, permission, overwrite, bufferSize,
replication, blockSize); replication, blockSize);
} }
@ -117,6 +147,8 @@ public Response putRoot(
public Response put( public Response put(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT) @QueryParam(PutOpParam.NAME) @DefaultValue(PutOpParam.DEFAULT)
final PutOpParam op, final PutOpParam op,
@ -132,14 +164,8 @@ public Response put(
final BlockSizeParam blockSize final BlockSizeParam blockSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) { init(ugi, delegation, path, op, permission, overwrite, bufferSize,
LOG.trace(op + ": " + path + ", ugi=" + ugi replication, blockSize);
+ Param.toSortedString(", ", permission, overwrite, bufferSize,
replication, blockSize));
}
//clear content type
response.setContentType(null);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
@ -193,12 +219,14 @@ public Response run() throws IOException, URISyntaxException {
public Response postRoot( public Response postRoot(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op, final PostOpParam op,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return post(in, ugi, ROOT, op, bufferSize); return post(in, ugi, delegation, ROOT, op, bufferSize);
} }
/** Handle HTTP POST request. */ /** Handle HTTP POST request. */
@ -209,6 +237,8 @@ public Response postRoot(
public Response post( public Response post(
final InputStream in, final InputStream in,
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT) @QueryParam(PostOpParam.NAME) @DefaultValue(PostOpParam.DEFAULT)
final PostOpParam op, final PostOpParam op,
@ -216,13 +246,7 @@ public Response post(
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) { init(ugi, delegation, path, op, bufferSize);
LOG.trace(op + ": " + path + ", ugi=" + ugi
+ Param.toSortedString(", ", bufferSize));
}
//clear content type
response.setContentType(null);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override
@ -265,6 +289,8 @@ public Response run() throws IOException {
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response getRoot( public Response getRoot(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op, final GetOpParam op,
@QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT) @QueryParam(OffsetParam.NAME) @DefaultValue(OffsetParam.DEFAULT)
@ -274,7 +300,7 @@ public Response getRoot(
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT) @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
return get(ugi, ROOT, op, offset, length, bufferSize); return get(ugi, delegation, ROOT, op, offset, length, bufferSize);
} }
/** Handle HTTP GET request. */ /** Handle HTTP GET request. */
@ -283,6 +309,8 @@ public Response getRoot(
@Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON}) @Produces({MediaType.APPLICATION_OCTET_STREAM, MediaType.APPLICATION_JSON})
public Response get( public Response get(
@Context final UserGroupInformation ugi, @Context final UserGroupInformation ugi,
@QueryParam(DelegationParam.NAME) @DefaultValue(DelegationParam.DEFAULT)
final DelegationParam delegation,
@PathParam(UriFsPathParam.NAME) final UriFsPathParam path, @PathParam(UriFsPathParam.NAME) final UriFsPathParam path,
@QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT) @QueryParam(GetOpParam.NAME) @DefaultValue(GetOpParam.DEFAULT)
final GetOpParam op, final GetOpParam op,
@ -294,13 +322,7 @@ public Response get(
final BufferSizeParam bufferSize final BufferSizeParam bufferSize
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {
if (LOG.isTraceEnabled()) { init(ugi, delegation, path, op, offset, length, bufferSize);
LOG.trace(op + ": " + path + ", ugi=" + ugi
+ Param.toSortedString(", ", offset, length, bufferSize));
}
//clear content type
response.setContentType(null);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override @Override

View File

@ -154,7 +154,7 @@ private Token<? extends TokenIdentifier> generateDelegationToken(
namenode, ugi, renewer != null? renewer: ugi.getShortUserName()); namenode, ugi, renewer != null? renewer: ugi.getShortUserName());
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
t.setKind(WebHdfsFileSystem.TOKEN_KIND); t.setKind(WebHdfsFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(t, namenode.getNameNodeAddress()); SecurityUtil.setTokenService(t, namenode.getHttpAddress());
return t; return t;
} }

View File

@ -114,17 +114,24 @@ public class WebHdfsFileSystem extends FileSystem
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator(); private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
/** Delegation token kind */ /** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */
public static final AbstractDelegationTokenSelector<DelegationTokenIdentifier> DT_SELECTOR
= new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(TOKEN_KIND) {};
private static final DelegationTokenRenewer<WebHdfsFileSystem> dtRenewer private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
= new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
static { private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
dtRenewer.start(); if (DT_RENEWER == null) {
DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
DT_RENEWER.start();
}
DT_RENEWER.addRenewAction(webhdfs);
} }
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
private InetSocketAddress nnAddr; private InetSocketAddress nnAddr;
private Token<?> delegationToken; private Token<?> delegationToken;
private Token<?> renewToken;
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token(); private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
private Path workingDir; private Path workingDir;
@ -153,8 +160,7 @@ public synchronized void initialize(URI uri, Configuration conf
protected void initDelegationToken() throws IOException { protected void initDelegationToken() throws IOException {
// look for webhdfs token, then try hdfs // look for webhdfs token, then try hdfs
final Text serviceName = SecurityUtil.buildTokenService(nnAddr); final Text serviceName = SecurityUtil.buildTokenService(nnAddr);
Token<?> token = webhdfspTokenSelector.selectToken( Token<?> token = DT_SELECTOR.selectToken(serviceName, ugi.getTokens());
serviceName, ugi.getTokens());
if (token == null) { if (token == null) {
token = DelegationTokenSelector.selectHdfsDelegationToken( token = DelegationTokenSelector.selectHdfsDelegationToken(
nnAddr, ugi, getConf()); nnAddr, ugi, getConf());
@ -171,7 +177,7 @@ protected void initDelegationToken() throws IOException {
if (token != null) { if (token != null) {
setDelegationToken(token); setDelegationToken(token);
if (createdToken) { if (createdToken) {
dtRenewer.addRenewAction(this); addRenewAction(this);
LOG.debug("Created new DT for " + token.getService()); LOG.debug("Created new DT for " + token.getService());
} else { } else {
LOG.debug("Found existing DT for " + token.getService()); LOG.debug("Found existing DT for " + token.getService());
@ -652,23 +658,14 @@ public List<Token<?>> getDelegationTokens(final String renewer
@Override @Override
public Token<?> getRenewToken() { public Token<?> getRenewToken() {
return renewToken; return delegationToken;
} }
@Override @Override
public <T extends TokenIdentifier> void setDelegationToken( public <T extends TokenIdentifier> void setDelegationToken(
final Token<T> token) { final Token<T> token) {
synchronized(this) { synchronized(this) {
renewToken = token; delegationToken = token;
// emulate the 203 usage of the tokens
// by setting the kind and service as if they were hdfs tokens
delegationToken = new Token<T>(token);
// NOTE: the remote nn must be configured to use hdfs
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
// no need to change service because we aren't exactly sure what it
// should be. we can guess, but it might be wrong if the local conf
// value is incorrect. the service is a client side field, so the remote
// end does not care about the value
} }
} }
@ -728,15 +725,6 @@ public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
return JsonUtil.toMD5MD5CRC32FileChecksum(m); return JsonUtil.toMD5MD5CRC32FileChecksum(m);
} }
private static final DtSelector webhdfspTokenSelector = new DtSelector();
private static class DtSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
private DtSelector() {
super(TOKEN_KIND);
}
}
/** Delegation token renewer. */ /** Delegation token renewer. */
public static class DtRenewer extends TokenRenewer { public static class DtRenewer extends TokenRenewer {
@Override @Override

View File

@ -53,7 +53,7 @@ public UserGroupInformation getValue(final HttpContext context) {
return JspHelper.getUGI(servletcontext, request, conf, return JspHelper.getUGI(servletcontext, request, conf,
AuthenticationMethod.KERBEROS, false); AuthenticationMethod.KERBEROS, false);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new SecurityException("Failed to obtain user group information.", e);
} }
} }

View File

@ -75,7 +75,7 @@ public void testDelegationTokenInUrl() throws IOException {
+ "&token=" + tokenString, renewTokenUrl.getQuery()); + "&token=" + tokenString, renewTokenUrl.getQuery());
Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>( Token<DelegationTokenIdentifier> delegationToken = new Token<DelegationTokenIdentifier>(
token); token);
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); delegationToken.setKind(WebHdfsFileSystem.TOKEN_KIND);
Assert.assertEquals( Assert.assertEquals(
generateUrlQueryPrefix(PutOpParam.Op.CANCELDELEGATIONTOKEN, generateUrlQueryPrefix(PutOpParam.Op.CANCELDELEGATIONTOKEN,
ugi.getUserName()) ugi.getUserName())