From 002dd6968b89ded6a77858ccb50c9b2df074c226 Mon Sep 17 00:00:00 2001 From: Jitendra Nath Pandey Date: Fri, 14 Oct 2011 01:24:20 +0000 Subject: [PATCH] MAPREDUCE-2764. Fix renewal of dfs delegation tokens. Contributed by Owen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1183187 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 2 + .../apache/hadoop/security/SecurityUtil.java | 21 ++ .../hadoop/security/UserGroupInformation.java | 17 ++ .../apache/hadoop/security/token/Token.java | 116 +++++++- .../hadoop/security/token/TokenRenewer.java | 69 +++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../main/java/org/apache/hadoop/fs/Hdfs.java | 2 + .../org/apache/hadoop/hdfs/DFSClient.java | 78 +++++- .../hadoop/hdfs/DistributedFileSystem.java | 17 +- .../apache/hadoop/hdfs/HftpFileSystem.java | 156 +++++++---- .../token/block/BlockTokenIdentifier.java | 9 + .../hdfs/tools/DelegationTokenFetcher.java | 88 +++--- ....apache.hadoop.security.token.TokenRenewer | 3 + .../hadoop/fs/TestResolveHdfsSymlink.java | 2 +- .../hdfs/security/TestDelegationToken.java | 1 + .../namenode/OfflineEditsViewerHelper.java | 8 +- .../tools/TestDelegationTokenFetcher.java | 61 ++++- ....apache.hadoop.security.token.TokenRenewer | 1 + hadoop-mapreduce-project/CHANGES.txt | 2 + .../org/apache/hadoop/mapred/JobClient.java | 38 ++- .../org/apache/hadoop/mapreduce/Cluster.java | 2 + .../token/DelegationTokenRenewal.java | 255 ++++-------------- .../security/token/JobTokenIdentifier.java | 11 +- .../delegation/DelegationTokenIdentifier.java | 2 +- ....apache.hadoop.security.token.TokenRenewer | 2 + .../security/ApplicationTokenIdentifier.java | 9 + .../security/ContainerTokenIdentifier.java | 10 + ....apache.hadoop.security.token.TokenRenewer | 2 + ....apache.hadoop.security.token.TokenRenewer | 1 + .../token/TestDelegationTokenRenewal.java | 126 +++++---- 30 files changed, 731 insertions(+), 382 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/TokenRenewer.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer create mode 100644 hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 6a9117fdbc..66e6d53933 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -81,6 +81,8 @@ Trunk (unreleased changes) HADOOP-7721. Add log before login in KerberosAuthenticationHandler. (jitendra) + MAPREDUCE-2764. Fix renewal of dfs delegation tokens. (Owen via jitendra) + Release 0.23.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java index cbf408981c..58f8bde466 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SecurityUtil.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; import java.net.UnknownHostException; @@ -34,7 +35,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenInfo; import sun.security.jgss.krb5.Krb5Util; @@ -352,4 +355,22 @@ public static TokenInfo getTokenInfo(Class protocol, Configuration conf) { return null; } + /** + * Set the given token's service to the format expected by the RPC client + * @param token a delegation token + * @param addr the socket for the rpc connection + */ + public static void setTokenService(Token token, InetSocketAddress addr) { + token.setService(buildTokenService(addr)); + } + + /** + * Construct the service key for a token + * @param addr InetSocketAddress of remote connection with a token + * @return "ip:port" + */ + public static Text buildTokenService(InetSocketAddress addr) { + String host = addr.getAddress().getHostAddress(); + return new Text(host + ":" + addr.getPort()); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 83f856d8f3..a13e775d54 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -634,6 +634,23 @@ static void loginUserFromKeytab(String user, + " using keytab file " + keytabFile); } + /** + * Re-login a user from keytab if TGT is expired or is close to expiry. + * + * @throws IOException + */ + public synchronized void checkTGTAndReloginFromKeytab() throws IOException { + if (!isSecurityEnabled() + || user.getAuthenticationMethod() != AuthenticationMethod.KERBEROS + || !isKeytab) + return; + KerberosTicket tgt = getTGT(); + if (tgt != null && System.currentTimeMillis() < getRefreshTime(tgt)) { + return; + } + reloginFromKeytab(); + } + /** * Re-Login a user in from a keytab file. Loads a user identity from a keytab * file and logs them in. They become the currently logged-in user. This diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java index d47f99429f..32383da9a0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java @@ -22,11 +22,15 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.ServiceLoader; import org.apache.commons.codec.binary.Base64; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -40,10 +44,12 @@ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceStability.Evolving public class Token implements Writable { + public static final Log LOG = LogFactory.getLog(Token.class); private byte[] identifier; private byte[] password; private Text kind; private Text service; + private TokenRenewer renewer; /** * Construct a token given a token identifier and a secret manager for the @@ -82,6 +88,17 @@ public Token() { service = new Text(); } + /** + * Clone a token. + * @param other the token to clone + */ + public Token(Token other) { + this.identifier = other.identifier; + this.password = other.password; + this.kind = other.kind; + this.service = other.service; + } + /** * Get the token identifier * @return the token identifier @@ -106,6 +123,17 @@ public Text getKind() { return kind; } + /** + * Set the token kind. This is only intended to be used by services that + * wrap another service's token, such as HFTP wrapping HDFS. + * @param newKind + */ + @InterfaceAudience.Private + public synchronized void setKind(Text newKind) { + kind = newKind; + renewer = null; + } + /** * Get the service on which the token is supposed to be used * @return the service name @@ -244,4 +272,90 @@ public String toString() { buffer.append(service.toString()); return buffer.toString(); } + + private static ServiceLoader renewers = + ServiceLoader.load(TokenRenewer.class); + + private synchronized TokenRenewer getRenewer() throws IOException { + if (renewer != null) { + return renewer; + } + renewer = TRIVIAL_RENEWER; + for (TokenRenewer canidate: renewers) { + if (canidate.handleKind(this.kind)) { + renewer = canidate; + return renewer; + } + } + LOG.warn("No TokenRenewer defined for token kind " + this.kind); + return renewer; + } + + /** + * Is this token managed so that it can be renewed or cancelled? + * @return true, if it can be renewed and cancelled. + */ + public boolean isManaged() throws IOException { + return getRenewer().isManaged(this); + } + + /** + * Renew this delegation token + * @return the new expiration time + * @throws IOException + * @throws InterruptedException + */ + public long renew(Configuration conf + ) throws IOException, InterruptedException { + return getRenewer().renew(this, conf); + } + + /** + * Cancel this delegation token + * @throws IOException + * @throws InterruptedException + */ + public void cancel(Configuration conf + ) throws IOException, InterruptedException { + getRenewer().cancel(this, conf); + } + + /** + * A trivial renewer for token kinds that aren't managed. Sub-classes need + * to implement getKind for their token kind. + */ + @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) + @InterfaceStability.Evolving + public static class TrivialRenewer extends TokenRenewer { + + // define the kind for this renewer + protected Text getKind() { + return null; + } + + @Override + public boolean handleKind(Text kind) { + return kind.equals(getKind()); + } + + @Override + public boolean isManaged(Token token) { + return false; + } + + @Override + public long renew(Token token, Configuration conf) { + throw new UnsupportedOperationException("Token renewal is not supported "+ + " for " + token.kind + " tokens"); + } + + @Override + public void cancel(Token token, Configuration conf) throws IOException, + InterruptedException { + throw new UnsupportedOperationException("Token cancel is not supported " + + " for " + token.kind + " tokens"); + } + + } + private static final TokenRenewer TRIVIAL_RENEWER = new TrivialRenewer(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/TokenRenewer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/TokenRenewer.java new file mode 100644 index 0000000000..fbd3c93516 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/TokenRenewer.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.security.token; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; + +/** + * This is the interface for plugins that handle tokens. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) +@InterfaceStability.Evolving +public abstract class TokenRenewer { + + /** + * Does this renewer handle this kind of token? + * @param kind the kind of the token + * @return true if this renewer can renew it + */ + public abstract boolean handleKind(Text kind); + + /** + * Is the given token managed? Only managed tokens may be renewed or + * cancelled. + * @param token the token being checked + * @return true if the token may be renewed or cancelled + * @throws IOException + */ + public abstract boolean isManaged(Token token) throws IOException; + + /** + * Renew the given token. + * @return the new expiration time + * @throws IOException + * @throws InterruptedException + */ + public abstract long renew(Token token, + Configuration conf + ) throws IOException, InterruptedException; + + /** + * Cancel the given token + * @throws IOException + * @throws InterruptedException + */ + public abstract void cancel(Token token, + Configuration conf + ) throws IOException, InterruptedException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8d09492014..1371719620 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -123,6 +123,8 @@ Trunk (unreleased changes) HDFS-2424. Added a root element "HdfsFileStatuses" for the response of webhdfs listStatus. (szetszwo) + MAPREDUCE-2764. Fix renewal of dfs delegation tokens. (Owen via jitendra) + Release 0.23.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java index 7772ad9792..5a45f51ee5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -409,6 +409,7 @@ public List> getDelegationTokens(String renewer) throws IOException { * @return the new expiration time * @throws InvalidToken * @throws IOException + * @deprecated Use Token.renew instead. */ @SuppressWarnings("unchecked") public long renewDelegationToken( @@ -423,6 +424,7 @@ public long renewDelegationToken( * @param token delegation token * @throws InvalidToken * @throws IOException + * @deprecated Use Token.cancel instead. */ @SuppressWarnings("unchecked") public void cancelDelegationToken( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 0e34dae9a8..65a9faeff4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -93,9 +93,11 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.Progressable; /******************************************************** @@ -115,6 +117,7 @@ public class DFSClient implements java.io.Closeable { public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB final ClientProtocol namenode; + private final InetSocketAddress nnAddress; final UserGroupInformation ugi; volatile boolean clientRunning = true; private volatile FsServerDefaults serverDefaults; @@ -241,6 +244,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, this.dfsClientConf = new Conf(conf); this.conf = conf; this.stats = stats; + this.nnAddress = nameNodeAddr; this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); @@ -442,18 +446,26 @@ public Token getDelegationToken(Text renewer) throws IOException { Token result = namenode.getDelegationToken(renewer); + SecurityUtil.setTokenService(result, nnAddress); LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(result)); return result; } /** - * @see ClientProtocol#renewDelegationToken(Token) + * Renew a delegation token + * @param token the token to renew + * @return the new expiration time + * @throws InvalidToken + * @throws IOException + * @deprecated Use Token.renew instead. */ public long renewDelegationToken(Token token) throws InvalidToken, IOException { LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token)); try { - return namenode.renewDelegationToken(token); + return token.renew(conf); + } catch (InterruptedException ie) { + throw new RuntimeException("caught interrupted", ie); } catch (RemoteException re) { throw re.unwrapRemoteException(InvalidToken.class, AccessControlException.class); @@ -461,19 +473,77 @@ public long renewDelegationToken(Token token) } /** - * @see ClientProtocol#cancelDelegationToken(Token) + * Cancel a delegation token + * @param token the token to cancel + * @throws InvalidToken + * @throws IOException + * @deprecated Use Token.cancel instead. */ public void cancelDelegationToken(Token token) throws InvalidToken, IOException { LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token)); try { - namenode.cancelDelegationToken(token); + token.cancel(conf); + } catch (InterruptedException ie) { + throw new RuntimeException("caught interrupted", ie); } catch (RemoteException re) { throw re.unwrapRemoteException(InvalidToken.class, AccessControlException.class); } } + @InterfaceAudience.Private + public static class Renewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind); + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token token, Configuration conf) throws IOException { + Token delToken = + (Token) token; + LOG.info("Renewing " + + DelegationTokenIdentifier.stringifyToken(delToken)); + ClientProtocol nn = + DFSUtil.createNamenode + (NameNode.getAddress(token.getService().toString()), + conf, UserGroupInformation.getCurrentUser()); + try { + return nn.renewDelegationToken(delToken); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token token, Configuration conf) throws IOException { + Token delToken = + (Token) token; + LOG.info("Cancelling " + + DelegationTokenIdentifier.stringifyToken(delToken)); + ClientProtocol nn = DFSUtil.createNamenode( + NameNode.getAddress(token.getService().toString()), conf, + UserGroupInformation.getCurrentUser()); + try { + nn.cancelDelegationToken(delToken); + } catch (RemoteException re) { + throw re.unwrapRemoteException(InvalidToken.class, + AccessControlException.class); + } + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + } + /** * Report corrupt blocks that were discovered by the client. * @see ClientProtocol#reportBadBlocks(LocatedBlock[]) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 4d12efe5fc..4de8fe4fd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -811,7 +811,6 @@ Token getDelegationToken(String renewer ) throws IOException { Token result = dfs.getDelegationToken(renewer == null ? null : new Text(renewer)); - result.setService(new Text(getCanonicalServiceName())); return result; } @@ -831,7 +830,7 @@ Token getDelegationToken(String renewer @Deprecated public Token getDelegationToken(Text renewer) throws IOException { - return dfs.getDelegationToken(renewer); + return getDelegationToken(renewer.toString()); } @Override // FileSystem @@ -848,10 +847,15 @@ public List> getDelegationTokens(String renewer) throws IOException { * @param token delegation token obtained earlier * @return the new expiration time * @throws IOException + * @deprecated Use Token.renew instead. */ public long renewDelegationToken(Token token) throws InvalidToken, IOException { - return dfs.renewDelegationToken(token); + try { + return token.renew(getConf()); + } catch (InterruptedException ie) { + throw new RuntimeException("Caught interrupted", ie); + } } /** @@ -859,10 +863,15 @@ public long renewDelegationToken(Token token) * * @param token delegation token * @throws IOException + * @deprecated Use Token.cancel instead. */ public void cancelDelegationToken(Token token) throws IOException { - dfs.cancelDelegationToken(token); + try { + token.cancel(getConf()); + } catch (InterruptedException ie) { + throw new RuntimeException("Caught interrupted", ie); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java index d049dd2b6f..fedc69a51c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java @@ -60,6 +60,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ServletUtil; import org.xml.sax.Attributes; @@ -83,14 +84,18 @@ public class HftpFileSystem extends FileSystem { HttpURLConnection.setFollowRedirects(true); } + public static final Text TOKEN_KIND = new Text("HFTP delegation"); + private String nnHttpUrl; - private URI hdfsURI; + private Text hdfsServiceName; + private URI hftpURI; protected InetSocketAddress nnAddr; protected UserGroupInformation ugi; public static final String HFTP_TIMEZONE = "UTC"; public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; - private Token delegationToken; + private Token delegationToken; + private Token renewToken; public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_"; public static final SimpleDateFormat getDateFormat() { @@ -118,7 +123,7 @@ protected int getDefaultPort() { @Override public String getCanonicalServiceName() { - return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort()); + return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort()); } private String buildUri(String schema, String host, int port) { @@ -144,17 +149,21 @@ public void initialize(final URI name, final Configuration conf) urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); - nnHttpUrl = - buildUri("https://", NetUtils.normalizeHostName(name.getHost()), urlPort); + String normalizedNN = NetUtils.normalizeHostName(name.getHost()); + nnHttpUrl = buildUri("https://", normalizedNN ,urlPort); LOG.debug("using url to get DT:" + nnHttpUrl); + try { + hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort)); + } catch (URISyntaxException ue) { + throw new IOException("bad uri for hdfs", ue); + } - - // if one uses RPC port different from the Default one, // one should specify what is the setvice name for this delegation token // otherwise it is hostname:RPC_PORT - String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+ - SecurityUtil.buildDTServiceName(name, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); + String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY + + SecurityUtil.buildDTServiceName(name, + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); if(LOG.isDebugEnabled()) { LOG.debug("Trying to find DT for " + name + " using key=" + key + "; conf=" + conf.get(key, "")); @@ -165,9 +174,10 @@ public void initialize(final URI name, final Configuration conf) nnPort = NetUtils.createSocketAddr(nnServiceName, NameNode.DEFAULT_PORT).getPort(); } - try { - hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort)); + URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort); + hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI, + nnPort)); } catch (URISyntaxException ue) { throw new IOException("bad uri for hdfs", ue); } @@ -175,30 +185,55 @@ public void initialize(final URI name, final Configuration conf) if (UserGroupInformation.isSecurityEnabled()) { //try finding a token for this namenode (esp applicable for tasks //using hftp). If there exists one, just set the delegationField - String canonicalName = getCanonicalServiceName(); + String hftpServiceName = getCanonicalServiceName(); for (Token t : ugi.getTokens()) { - if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) && - t.getService().toString().equals(canonicalName)) { - if(LOG.isDebugEnabled()) { - LOG.debug("Found existing DT for " + name); + Text kind = t.getKind(); + if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) { + if (t.getService().toString().equals(hdfsServiceName)) { + setDelegationToken(t); + break; + } + } else if (TOKEN_KIND.equals(kind)) { + if (hftpServiceName + .equals(normalizeService(t.getService().toString()))) { + setDelegationToken(t); + break; } - delegationToken = (Token) t; - break; } } //since we don't already have a token, go get one over https if (delegationToken == null) { - delegationToken = - (Token) getDelegationToken(null); + setDelegationToken(getDelegationToken(null)); renewer.addTokenToRenew(this); } } } - + + private String normalizeService(String service) { + int colonIndex = service.indexOf(':'); + if (colonIndex == -1) { + throw new IllegalArgumentException("Invalid service for hftp token: " + + service); + } + String hostname = + NetUtils.normalizeHostName(service.substring(0, colonIndex)); + String port = service.substring(colonIndex + 1); + return hostname + ":" + port; + } + + private void setDelegationToken(Token token) { + renewToken = token; + // emulate the 203 usage of the tokens + // by setting the kind and service as if they were hdfs tokens + delegationToken = new Token(token); + delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); + delegationToken.setService(hdfsServiceName); + } @Override - public synchronized Token getDelegationToken(final String renewer) throws IOException { + public synchronized Token getDelegationToken(final String renewer + ) throws IOException { try { //Renew TGT if needed ugi.reloginFromKeytab(); @@ -221,7 +256,6 @@ public Token run() throws IOException { LOG.debug("Got dt for " + getUri() + ";t.service=" +t.getService()); } - t.setService(new Text(getCanonicalServiceName())); return t; } return null; @@ -625,7 +659,8 @@ public long getDelay(TimeUnit unit) { @Override public int compareTo(Delayed o) { if (o.getClass() != RenewAction.class) { - throw new IllegalArgumentException("Illegal comparision to non-RenewAction"); + throw new IllegalArgumentException + ("Illegal comparision to non-RenewAction"); } RenewAction other = (RenewAction) o; return timestamp < other.timestamp ? -1 : @@ -662,31 +697,20 @@ public void setNewTime(long newTime) { * @return * @throws IOException */ - @SuppressWarnings("unchecked") public boolean renew() throws IOException, InterruptedException { final HftpFileSystem fs = weakFs.get(); if (fs != null) { synchronized (fs) { - fs.ugi.reloginFromKeytab(); - fs.ugi.doAs(new PrivilegedExceptionAction() { - - @Override - public Void run() throws Exception { - try { - DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl, - fs.delegationToken); - } catch (IOException ie) { - try { - fs.delegationToken = - (Token) fs.getDelegationToken(null); - } catch (IOException ie2) { - throw new IOException("Can't renew or get new delegation token ", - ie); - } - } - return null; - } - }); + try { + fs.renewToken.renew(fs.getConf()); + } catch (IOException ie) { + try { + fs.setDelegationToken(fs.getDelegationToken(null)); + } catch (IOException ie2) { + throw new IOException("Can't renew or get new delegation " + + "token ", ie); + } + } } } return fs != null; @@ -722,7 +746,7 @@ public RenewerThread() { } public void addTokenToRenew(HftpFileSystem fs) { - queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs)); + queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(), fs)); } public void run() { @@ -747,4 +771,44 @@ public void run() { } } } + + @InterfaceAudience.Private + public static class TokenManager extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return kind.equals(TOKEN_KIND); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token token, + Configuration conf) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + // use https to renew the token + return + DelegationTokenFetcher.renewDelegationToken + ("https://" + token.getService().toString(), + (Token) token); + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token token, + Configuration conf) throws IOException { + // update the kerberos credentials, if they are coming from a keytab + UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); + // use https to cancel the token + DelegationTokenFetcher.cancelDelegationToken + ("https://" + token.getService().toString(), + (Token) token); + } + + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java index 18c2ef2ea1..c1fd3f9f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @InterfaceAudience.Private @@ -171,4 +172,12 @@ public byte[] getBytes() { return cache; } + + @InterfaceAudience.Private + public static class Renewer extends Token.TrivialRenewer { + @Override + protected Text getKind() { + return KIND_NAME; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java index 1e85393343..eb8af25d26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java @@ -39,14 +39,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.HftpFileSystem; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -149,34 +152,31 @@ public Object run() throws Exception { DataInputStream in = new DataInputStream( new ByteArrayInputStream(token.getIdentifier())); id.readFields(in); - if(LOG.isDebugEnabled()) { - LOG.debug("Token (" + id + ") for " + token.getService()); + System.out.println("Token (" + id + ") for " + + token.getService()); + } + } else if (cancel) { + for(Token token: readTokens(tokenFile, conf)) { + if (token.isManaged()) { + token.cancel(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Cancelled token for " + token.getService()); + } } } - return null; - } - - if (webUrl != null) { - if (renew) { - long result; - for (Token token : readTokens(tokenFile, conf)) { - result = renewDelegationToken(webUrl, - (Token) token); - if(LOG.isDebugEnabled()) { - LOG.debug("Renewed token via " + webUrl + " for " - + token.getService() + " until: " + new Date(result)); + } else if (renew) { + for (Token token : readTokens(tokenFile, conf)) { + if (token.isManaged()) { + long result = token.renew(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Renewed token for " + token.getService() + + " until: " + new Date(result)); } } - } else if (cancel) { - for (Token token : readTokens(tokenFile, conf)) { - cancelDelegationToken(webUrl, - (Token) token); - if(LOG.isDebugEnabled()) { - LOG.debug("Cancelled token via " + webUrl + " for " - + token.getService()); - } - } - } else { + } + } else { + // otherwise we are fetching + if (webUrl != null) { Credentials creds = getDTfromRemote(webUrl, renewer); creds.writeTokenStorageFile(tokenFile, conf); for (Token token : creds.getAllTokens()) { @@ -185,29 +185,8 @@ public Object run() throws Exception { + token.getService() + " into " + tokenFile); } } - } - } else { - FileSystem fs = FileSystem.get(conf); - if (cancel) { - for (Token token : readTokens(tokenFile, conf)) { - ((DistributedFileSystem) fs) - .cancelDelegationToken((Token) token); - if(LOG.isDebugEnabled()) { - LOG.debug("Cancelled token for " - + token.getService()); - } - } - } else if (renew) { - long result; - for (Token token : readTokens(tokenFile, conf)) { - result = ((DistributedFileSystem) fs) - .renewDelegationToken((Token) token); - if(LOG.isDebugEnabled()) { - LOG.debug("Renewed token for " + token.getService() - + " until: " + new Date(result)); - } - } } else { + FileSystem fs = FileSystem.get(conf); Token token = fs.getDelegationToken(renewer); Credentials cred = new Credentials(); cred.addToken(token.getService(), token); @@ -230,8 +209,9 @@ static public Credentials getDTfromRemote(String nnAddr, try { StringBuffer url = new StringBuffer(); if (renewer != null) { - url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC).append("?"). - append(GetDelegationTokenServlet.RENEWER).append("=").append(renewer); + url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC) + .append("?").append(GetDelegationTokenServlet.RENEWER).append("=") + .append(renewer); } else { url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC); } @@ -248,6 +228,12 @@ static public Credentials getDTfromRemote(String nnAddr, Credentials ts = new Credentials(); dis = new DataInputStream(in); ts.readFields(dis); + for(Token token: ts.getAllTokens()) { + token.setKind(HftpFileSystem.TOKEN_KIND); + token.setService(new Text(SecurityUtil.buildDTServiceName + (remoteURL.toURI(), + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT))); + } return ts; } catch (Exception e) { throw new IOException("Unable to obtain remote token", e); @@ -295,7 +281,8 @@ static public long renewDelegationToken(String nnAddr, IOUtils.cleanup(LOG, in); if(e!=null) { - LOG.info("rethrowing exception from HTTP request: " + e.getLocalizedMessage()); + LOG.info("rethrowing exception from HTTP request: " + + e.getLocalizedMessage()); throw e; } throw ie; @@ -383,7 +370,8 @@ static public void cancelDelegationToken(String nnAddr, IOUtils.cleanup(LOG, in); if(e!=null) { - LOG.info("rethrowing exception from HTTP request: " + e.getLocalizedMessage()); + LOG.info("rethrowing exception from HTTP request: " + + e.getLocalizedMessage()); throw e; } throw ie; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..50402bbb64 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1,3 @@ +org.apache.hadoop.hdfs.DFSClient$Renewer +org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer +org.apache.hadoop.hdfs.HftpFileSystem$TokenManager \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java index 3b67f1b4d3..17608ac1f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestResolveHdfsSymlink.java @@ -105,7 +105,7 @@ public void testFcResolveAfs() throws IOException, InterruptedException { * @throws IOException * @throws InterruptedException */ - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void testFcDelegationToken() throws UnsupportedFileSystemException, IOException, InterruptedException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java index 9c577f740e..ef14fda6f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationToken.java @@ -183,6 +183,7 @@ public WebHdfsFileSystem run() throws Exception { dtSecretManager.renewToken(token, "JobTracker"); } + @SuppressWarnings("deprecation") @Test public void testDelegationTokenWithDoAs() throws Exception { final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java index 3fca8a3808..e22fa29927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java @@ -203,11 +203,9 @@ private CheckpointSignature runOperations() throws IOException { "JobTracker/foo.com@FOO.COM"); try { longUgi.doAs(new PrivilegedExceptionAction() { - public Object run() throws IOException { - final DistributedFileSystem dfs = - (DistributedFileSystem) cluster.getFileSystem(); - dfs.renewDelegationToken(token); - dfs.cancelDelegationToken(token); + public Object run() throws IOException, InterruptedException { + token.renew(config); + token.cancel(config); return null; } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenFetcher.java index f708c3e293..3832aa0735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenFetcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenFetcher.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; @@ -37,7 +36,9 @@ import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ public class TestDelegationTokenFetcher { private Configuration conf; private URI uri; private static final String SERVICE_VALUE = "localhost:2005"; + private static final Text KIND = new Text("TESTING-TOKEN-KIND"); private static String tokenFile = "file.dta"; @Before @@ -56,25 +58,59 @@ public void init() throws URISyntaxException, IOException { FileSystemTestHelper.addFileSystemForTesting(uri, conf, dfs); } + public static class FakeRenewer extends TokenRenewer { + static Token lastRenewed = null; + static Token lastCanceled = null; + + @Override + public boolean handleKind(Text kind) { + return KIND.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token token, Configuration conf) { + lastRenewed = token; + return 0; + } + + @Override + public void cancel(Token token, Configuration conf) { + lastCanceled = token; + } + + public static void reset() { + lastRenewed = null; + lastCanceled = null; + } + } + /** * Verify that when the DelegationTokenFetcher runs, it talks to the Namenode, * pulls out the correct user's token and successfully serializes it to disk. */ + @SuppressWarnings("deprecation") @Test public void expectedTokenIsRetrievedFromDFS() throws Exception { final byte[] ident = new DelegationTokenIdentifier(new Text("owner"), new Text("renewer"), new Text("realuser")).getBytes(); final byte[] pw = new byte[] { 42 }; - final Text kind = new Text("MY-KIND"); final Text service = new Text(uri.toString()); + final String user = + UserGroupInformation.getCurrentUser().getShortUserName(); // Create a token for the fetcher to fetch, wire NN to return it when asked // for this particular user. - Token t = new Token( - ident, pw, kind, service); - when(dfs.getDelegationToken((String) null)).thenReturn(t); + Token t = + new Token(ident, pw, KIND, service); + when(dfs.getDelegationToken(eq((String) null))).thenReturn(t); when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L); when(dfs.getUri()).thenReturn(uri); + FakeRenewer.reset(); FileSystem fileSys = FileSystem.getLocal(conf); try { @@ -88,14 +124,13 @@ public void expectedTokenIsRetrievedFromDFS() throws Exception { assertEquals(t, itr.next()); assertTrue(!itr.hasNext()); - DelegationTokenFetcher.main(new String[] { "-fs", uri.toString(), - "--print", tokenFile }); - DelegationTokenFetcher.main(new String[] { "-fs", uri.toString(), - "--renew", tokenFile }); - DelegationTokenFetcher.main(new String[] { "-fs", uri.toString(), - "--cancel", tokenFile }); - verify(dfs).renewDelegationToken(eq(t)); - verify(dfs).cancelDelegationToken(eq(t)); + DelegationTokenFetcher.main(new String[] { "--print", tokenFile }); + DelegationTokenFetcher.main(new String[] { "--renew", tokenFile }); + assertEquals(t, FakeRenewer.lastRenewed); + FakeRenewer.reset(); + + DelegationTokenFetcher.main(new String[] { "--cancel", tokenFile }); + assertEquals(t, FakeRenewer.lastCanceled); } finally { fileSys.delete(new Path(tokenFile), true); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..568cc80764 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1 @@ +org.apache.hadoop.tools.TestDelegationTokenFetcher$FakeRenewer diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 7f35debfc9..9744e872cc 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -48,6 +48,8 @@ Trunk (unreleased changes) MAPREDUCE-3183. hadoop-assemblies/src/main/resources/assemblies/hadoop-mapreduce-dist.xml missing license header. (Hitesh Shah via tucu). + MAPREDUCE-2764. Fix renewal of dfs delegation tokens. (Owen via jitendra) + Release 0.23.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index 9382dc4a97..6021609893 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -459,6 +460,37 @@ public void init(JobConf conf) throws IOException { cluster = new Cluster(conf); } + @InterfaceAudience.Private + public static class Renewer extends TokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind); + } + + @SuppressWarnings("unchecked") + @Override + public long renew(Token token, Configuration conf + ) throws IOException, InterruptedException { + return new Cluster(conf). + renewDelegationToken((Token) token); + } + + @SuppressWarnings("unchecked") + @Override + public void cancel(Token token, Configuration conf + ) throws IOException, InterruptedException { + new Cluster(conf). + cancelDelegationToken((Token) token); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + } + /** * Build a job client, connect to the indicated job tracker. * @@ -1048,22 +1080,24 @@ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException { * @return true if the renewal went well * @throws InvalidToken * @throws IOException + * @deprecated Use {@link Token.renew} instead */ public long renewDelegationToken(Token token ) throws InvalidToken, IOException, InterruptedException { - return cluster.renewDelegationToken(token); + return token.renew(getConf()); } /** * Cancel a delegation token from the JobTracker * @param token the token to cancel * @throws IOException + * @deprecated Use {@link Token.cancel} instead */ public void cancelDelegationToken(Token token ) throws InvalidToken, IOException, InterruptedException { - cluster.cancelDelegationToken(token); + token.cancel(getConf()); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java index 33d5f81b4f..f950fe0e9d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java @@ -371,6 +371,7 @@ public long getTaskTrackerExpiryInterval() throws IOException, * @return the new expiration time * @throws InvalidToken * @throws IOException + * @deprecated Use {@link Token.renew} instead */ public long renewDelegationToken(Token token ) throws InvalidToken, IOException, @@ -387,6 +388,7 @@ public long renewDelegationToken(Token token * Cancel a delegation token from the JobTracker * @param token the token to cancel * @throws IOException + * @deprecated Use {@link Token.cancel} instead */ public void cancelDelegationToken(Token token ) throws IOException, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java index 9e96b55ccb..e4675b523a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java @@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.security.token; import java.io.IOException; -import java.net.InetAddress; -import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Collections; @@ -37,18 +35,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.util.StringUtils; @@ -64,14 +54,14 @@ public class DelegationTokenRenewal { * */ private static class DelegationTokenToRenew { - public final Token token; + public final Token token; public final JobID jobId; public final Configuration conf; public long expirationDate; public TimerTask timerTask; public DelegationTokenToRenew( - JobID jId, Token t, + JobID jId, Token t, Configuration newConf, long newExpirationDate) { token = t; jobId = jId; @@ -124,10 +114,9 @@ public int hashCode() { private static class DelegationTokenCancelThread extends Thread { private static class TokenWithConf { - Token token; + Token token; Configuration conf; - TokenWithConf(Token token, - Configuration conf) { + TokenWithConf(Token token, Configuration conf) { this.token = token; this.conf = conf; } @@ -139,7 +128,7 @@ public DelegationTokenCancelThread() { super("Delegation Token Canceler"); setDaemon(true); } - public void cancelToken(Token token, + public void cancelToken(Token token, Configuration conf) { TokenWithConf tokenWithConf = new TokenWithConf(token, conf); while (!queue.offer(tokenWithConf)) { @@ -158,25 +147,21 @@ public void run() { TokenWithConf tokenWithConf = null; try { tokenWithConf = queue.take(); - DistributedFileSystem dfs = null; - try { - // do it over rpc. For that we need DFS object - dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf); - } catch (Exception e) { - LOG.info("couldn't get DFS to cancel. Will retry over HTTPS"); - dfs = null; - } - - if(dfs != null) { - dfs.cancelDelegationToken(tokenWithConf.token); - } else { - cancelDelegationTokenOverHttps(tokenWithConf.token, - tokenWithConf.conf); - } + final TokenWithConf current = tokenWithConf; + if (LOG.isDebugEnabled()) { - LOG.debug("Canceling token " + tokenWithConf.token.getService() + - " for dfs=" + dfs); + LOG.debug("Canceling token " + tokenWithConf.token.getService()); } + // need to use doAs so that http can find the kerberos tgt + UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + current.token.cancel(current.conf); + return null; + } + }); } catch (IOException e) { LOG.warn("Failed to cancel token " + tokenWithConf.token + " " + StringUtils.stringifyException(e)); @@ -195,119 +180,29 @@ private static void addTokenToList(DelegationTokenToRenew t) { delegationTokens.add(t); } - // kind of tokens we currently renew - private static final Text kindHdfs = - DelegationTokenIdentifier.HDFS_DELEGATION_KIND; - - @SuppressWarnings("unchecked") public static synchronized void registerDelegationTokensForRenewal( - JobID jobId, Credentials ts, Configuration conf) { + JobID jobId, Credentials ts, Configuration conf) throws IOException { if(ts==null) return; //nothing to add - Collection > tokens = ts.getAllTokens(); + Collection > tokens = ts.getAllTokens(); long now = System.currentTimeMillis(); - - for(Token t : tokens) { - // currently we only check for HDFS delegation tokens - // later we can add more different types. - if(! t.getKind().equals(kindHdfs)) { - continue; - } - Token dt = - (Token)t; - + + for (Token t : tokens) { // first renew happens immediately - DelegationTokenToRenew dtr = - new DelegationTokenToRenew(jobId, dt, conf, now); + if (t.isManaged()) { + DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf, + now); - addTokenToList(dtr); - - setTimerForTokenRenewal(dtr, true); - LOG.info("registering token for renewal for service =" + dt.getService()+ - " and jobID = " + jobId); - } - } - - private static String getHttpAddressForToken( - Token token, final Configuration conf) - throws IOException { + addTokenToList(dtr); - String[] ipaddr = token.getService().toString().split(":"); - - InetAddress iaddr = InetAddress.getByName(ipaddr[0]); - String dnsName = iaddr.getCanonicalHostName(); - - // in case it is a different cluster it may have a different port - String httpsPort = conf.get("dfs.hftp.https.port"); - if(httpsPort == null) { - // get from this cluster - httpsPort = conf.get(DFSConfigKeys.DFS_HTTPS_PORT_KEY, - "" + DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT); - } - - // always use https (it is for security only) - return "https://" + dnsName+":"+httpsPort; - } - - protected static long renewDelegationTokenOverHttps( - final Token token, final Configuration conf) - throws InterruptedException, IOException{ - final String httpAddress = getHttpAddressForToken(token, conf); - // will be chaged to debug - LOG.info("address to renew=" + httpAddress + "; tok=" + token.getService()); - Long expDate = (Long) UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction() { - public Long run() throws IOException { - return DelegationTokenFetcher.renewDelegationToken(httpAddress, token); - } - }); - LOG.info("Renew over HTTP done. addr="+httpAddress+";res="+expDate); - return expDate; - } - - private static long renewDelegationToken(DelegationTokenToRenew dttr) - throws Exception { - long newExpirationDate=System.currentTimeMillis()+3600*1000; - Token token = dttr.token; - Configuration conf = dttr.conf; - if(token.getKind().equals(kindHdfs)) { - DistributedFileSystem dfs=null; - - try { - // do it over rpc. For that we need DFS object - dfs = getDFSForToken(token, conf); - } catch (IOException e) { - LOG.info("couldn't get DFS to renew. Will retry over HTTPS"); - dfs = null; + setTimerForTokenRenewal(dtr, true); + LOG.info("registering token for renewal for service =" + t.getService() + + " and jobID = " + jobId); } - - try { - if(dfs != null) - newExpirationDate = dfs.renewDelegationToken(token); - else { - // try HTTP - newExpirationDate = renewDelegationTokenOverHttps(token, conf); - } - } catch (InvalidToken ite) { - LOG.warn("invalid token - not scheduling for renew"); - removeFailedDelegationToken(dttr); - throw new IOException("failed to renew token", ite); - } catch (AccessControlException ioe) { - LOG.warn("failed to renew token:"+token, ioe); - removeFailedDelegationToken(dttr); - throw new IOException("failed to renew token", ioe); - } catch (Exception e) { - LOG.warn("failed to renew token:"+token, e); - // returns default expiration date - } - } else { - throw new Exception("unknown token type to renew:"+token.getKind()); } - return newExpirationDate; } - - + /** * Task - to renew a token * @@ -319,41 +214,29 @@ private static class RenewalTimerTask extends TimerTask { @Override public void run() { - Token token = dttr.token; + Token token = dttr.token; long newExpirationDate=0; try { - newExpirationDate = renewDelegationToken(dttr); - } catch (Exception e) { - return; // message logged in renewDT method - } - if (LOG.isDebugEnabled()) - LOG.debug("renewing for:"+token.getService()+";newED=" + - newExpirationDate); - - // new expiration date - dttr.expirationDate = newExpirationDate; - setTimerForTokenRenewal(dttr, false);// set the next one - } - } - - private static DistributedFileSystem getDFSForToken( - Token token, final Configuration conf) - throws Exception { - DistributedFileSystem dfs = null; - try { - final URI uri = new URI (SCHEME + "://" + token.getService().toString()); - dfs = - UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction() { - public DistributedFileSystem run() throws IOException { - return (DistributedFileSystem) FileSystem.get(uri, conf); + // need to use doAs so that http can find the kerberos tgt + dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( + new PrivilegedExceptionAction() { + + @Override + public Long run() throws Exception { + return dttr.token.renew(dttr.conf); + } + }); + + if (LOG.isDebugEnabled()) { + LOG.debug("renewing for:" + token.getService() + ";newED=" + + dttr.expirationDate); } - }); - } catch (Exception e) { - LOG.warn("Failed to create a dfs to renew/cancel for:" + token.getService(), e); - throw e; - } - return dfs; + setTimerForTokenRenewal(dttr, false);// set the next one + } catch (Exception e) { + LOG.error("Exception renewing token" + token + ". Not rescheduled", e); + removeFailedDelegationToken(dttr); + } + } } /** @@ -372,15 +255,11 @@ private static void setTimerForTokenRenewal( renewIn = now + expiresIn - expiresIn/10; // little before expiration } - try { - // need to create new timer every time - TimerTask tTask = new RenewalTimerTask(token); - token.setTimerTask(tTask); // keep reference to the timer + // need to create new timer every time + TimerTask tTask = new RenewalTimerTask(token); + token.setTimerTask(tTask); // keep reference to the timer - renewalTimer.schedule(token.timerTask, new Date(renewIn)); - } catch (Exception e) { - LOG.warn("failed to schedule a task, token will not renew more", e); - } + renewalTimer.schedule(token.timerTask, new Date(renewIn)); } /** @@ -391,33 +270,9 @@ static public void close() { delegationTokens.clear(); } - - protected static void cancelDelegationTokenOverHttps( - final Token token, final Configuration conf) - throws InterruptedException, IOException{ - final String httpAddress = getHttpAddressForToken(token, conf); - // will be chaged to debug - LOG.info("address to cancel=" + httpAddress + "; tok=" + token.getService()); - - UserGroupInformation.getLoginUser().doAs( - new PrivilegedExceptionAction() { - public Void run() throws IOException { - DelegationTokenFetcher.cancelDelegationToken(httpAddress, token); - return null; - } - }); - LOG.info("Cancel over HTTP done. addr="+httpAddress); - } - - // cancel a token private static void cancelToken(DelegationTokenToRenew t) { - Token token = t.token; - Configuration conf = t.conf; - - if(token.getKind().equals(kindHdfs)) { - dtCancelThread.cancelToken(token, conf); - } + dtCancelThread.cancelToken(t.token, t.conf); } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java index 17a6541593..20c74f1e41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.UserGroupInformation; @@ -35,7 +36,7 @@ @InterfaceStability.Unstable public class JobTokenIdentifier extends TokenIdentifier { private Text jobid; - final static Text KIND_NAME = new Text("mapreduce.job"); + public final static Text KIND_NAME = new Text("mapreduce.job"); /** * Default constructor @@ -86,4 +87,12 @@ public void readFields(DataInput in) throws IOException { public void write(DataOutput out) throws IOException { jobid.write(out); } + + @InterfaceAudience.Private + public static class Renewer extends Token.TrivialRenewer { + @Override + protected Text getKind() { + return KIND_NAME; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java index a1d736abf1..77e5817c91 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/token/delegation/DelegationTokenIdentifier.java @@ -30,7 +30,7 @@ @InterfaceStability.Unstable public class DelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { - static final Text MAPREDUCE_DELEGATION_KIND = + public static final Text MAPREDUCE_DELEGATION_KIND = new Text("MAPREDUCE_DELEGATION_TOKEN"); /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..609a02c9b5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1,2 @@ +org.apache.hadoop.mapred.JobClient$Renewer +org.apache.hadoop.mapreduce.security.token.JobTokenIndentifier$Renewer \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java index 5dbdc0418a..4465e39cba 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ApplicationTokenIdentifier.java @@ -22,8 +22,10 @@ import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -74,4 +76,11 @@ public UserGroupInformation getUser() { return UserGroupInformation.createRemoteUser(appId.toString()); } + @InterfaceAudience.Private + public static class Renewer extends Token.TrivialRenewer { + @Override + protected Text getKind() { + return KIND_NAME; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index 313e8333b7..68ef4e9949 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -24,8 +24,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -115,4 +117,12 @@ public UserGroupInformation getUser() { return UserGroupInformation.createRemoteUser(this.containerId.toString()); } + + @InterfaceAudience.Private + public static class Renewer extends Token.TrivialRenewer { + @Override + protected Text getKind() { + return KIND; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..c19ebc31e5 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1,2 @@ +org.apache.hadoop.yarn.security.ApplicationTokenIdentifier$Renewer +org.apache.hadoop.yarn.security.ContainerTokenIdentifier$Renewer \ No newline at end of file diff --git a/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer new file mode 100644 index 0000000000..c1e2bd58c8 --- /dev/null +++ b/hadoop-mapreduce-project/src/test/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -0,0 +1 @@ +org.apache.hadoop.mapreduce.security.token.TestDelegationTokenRenewal$Renewer diff --git a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java index b9d3891106..ebd27b4f62 100644 --- a/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java +++ b/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/security/token/TestDelegationTokenRenewal.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.net.URI; @@ -41,6 +42,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.SecretManager.InvalidToken; +import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.util.StringUtils; import org.junit.BeforeClass; import org.junit.Test; @@ -54,6 +56,53 @@ public class TestDelegationTokenRenewal { private static final Log LOG = LogFactory.getLog(TestDelegationTokenRenewal.class); + private static final Text KIND = + new Text("TestDelegationTokenRenewal.Token"); + + public static class Renewer extends TokenRenewer { + private static int counter = 0; + private static Token lastRenewed = null; + private static Token tokenToRenewIn2Sec = null; + + @Override + public boolean handleKind(Text kind) { + return KIND.equals(kind); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token t, Configuration conf) throws IOException { + MyToken token = (MyToken)t; + if(token.isCanceled()) { + throw new InvalidToken("token has been canceled"); + } + lastRenewed = token; + counter ++; + LOG.info("Called MYDFS.renewdelegationtoken " + token + + ";this dfs=" + this.hashCode() + ";c=" + counter); + if(tokenToRenewIn2Sec == token) { + // this token first renewal in 2 seconds + LOG.info("RENEW in 2 seconds"); + tokenToRenewIn2Sec=null; + return 2*1000 + System.currentTimeMillis(); + } else { + return 86400*1000 + System.currentTimeMillis(); + } + } + + @Override + public void cancel(Token t, Configuration conf) { + MyToken token = (MyToken)t; + LOG.info("Cancel token " + token); + token.cancelToken(); + } + + } + private static Configuration conf; @BeforeClass @@ -66,7 +115,7 @@ public static void setUp() throws Exception { System.out.println("scheme is : " + uri.getScheme()); conf.setClass("fs." + uri.getScheme() + ".impl", MyFS.class, DistributedFileSystem.class); FileSystem.setDefaultUri(conf, uri); - System.out.println("filesystem uri = " + FileSystem.getDefaultUri(conf).toString()); + LOG.info("filesystem uri = " + FileSystem.getDefaultUri(conf).toString()); } private static class MyDelegationTokenSecretManager extends DelegationTokenSecretManager { @@ -97,11 +146,14 @@ private static class MyToken extends Token { public MyToken(DelegationTokenIdentifier dtId1, MyDelegationTokenSecretManager sm) { super(dtId1, sm); + setKind(KIND); status = "GOOD"; } public boolean isCanceled() {return status.equals(CANCELED);} + public void cancelToken() {this.status=CANCELED;} + public String toString() { StringBuilder sb = new StringBuilder(1024); @@ -127,50 +179,19 @@ public String toString() { * exception */ static class MyFS extends DistributedFileSystem { - int counter=0; - MyToken token; - MyToken tokenToRenewIn2Sec; public MyFS() {} public void close() {} @Override public void initialize(URI uri, Configuration conf) throws IOException {} - @Override - public long renewDelegationToken(Token t) - throws InvalidToken, IOException { - MyToken token = (MyToken)t; - if(token.isCanceled()) { - throw new InvalidToken("token has been canceled"); - } - counter ++; - this.token = (MyToken)token; - System.out.println("Called MYDFS.renewdelegationtoken " + token); - if(tokenToRenewIn2Sec == token) { - // this token first renewal in 2 seconds - System.out.println("RENEW in 2 seconds"); - tokenToRenewIn2Sec=null; - return 2*1000 + System.currentTimeMillis(); - } else { - return 86400*1000 + System.currentTimeMillis(); - } - } @Override - public MyToken getDelegationToken(Text renewer) - throws IOException { - System.out.println("Called MYDFS.getdelegationtoken"); - return createTokens(renewer); - } - @Override - public void cancelDelegationToken(Token t) - throws IOException { - MyToken token = (MyToken)t; - token.cancelToken(); + public MyToken getDelegationToken(Text renewer) throws IOException { + MyToken result = createTokens(renewer); + LOG.info("Called MYDFS.getdelegationtoken " + result); + return result; } - public void setTokenToRenewIn2Sec(MyToken t) {tokenToRenewIn2Sec=t;} - public int getCounter() {return counter; } - public MyToken getToken() {return token;} } /** @@ -218,9 +239,9 @@ static MyToken createTokens(Text renewer) * @throws URISyntaxException */ @Test - public void testDTRenewal () throws IOException, URISyntaxException { + public void testDTRenewal () throws Exception { MyFS dfs = (MyFS)FileSystem.get(conf); - System.out.println("dfs="+(Object)dfs); + LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+conf.hashCode()); // Test 1. - add three tokens - make sure exactly one get's renewed // get the delegation tokens @@ -230,8 +251,8 @@ public void testDTRenewal () throws IOException, URISyntaxException { token3 = dfs.getDelegationToken(new Text("user3")); //to cause this one to be set for renew in 2 secs - dfs.setTokenToRenewIn2Sec(token1); - System.out.println("token="+token1+" should be renewed for 2 secs"); + Renewer.tokenToRenewIn2Sec = token1; + LOG.info("token="+token1+" should be renewed for 2 secs"); // two distinct Namenodes String nn1 = DelegationTokenRenewal.SCHEME + "://host1:0"; @@ -258,15 +279,13 @@ public void testDTRenewal () throws IOException, URISyntaxException { } catch (InterruptedException e) {} // since we cannot guarantee timely execution - let's give few chances - if(dfs.getCounter()==numberOfExpectedRenewals) + if(Renewer.counter==numberOfExpectedRenewals) break; } - System.out.println("Counter = " + dfs.getCounter() + ";t="+ - dfs.getToken()); assertEquals("renew wasn't called as many times as expected(4):", - numberOfExpectedRenewals, dfs.getCounter()); - assertEquals("most recently renewed token mismatch", dfs.getToken(), + numberOfExpectedRenewals, Renewer.counter); + assertEquals("most recently renewed token mismatch", Renewer.lastRenewed, token1); // Test 2. @@ -277,8 +296,8 @@ public void testDTRenewal () throws IOException, URISyntaxException { MyToken token4 = dfs.getDelegationToken(new Text("user4")); //to cause this one to be set for renew in 2 secs - dfs.setTokenToRenewIn2Sec(token4); - System.out.println("token="+token4+" should be renewed for 2 secs"); + Renewer.tokenToRenewIn2Sec = token4; + LOG.info("token="+token4+" should be renewed for 2 secs"); String nn4 = DelegationTokenRenewal.SCHEME + "://host4:0"; ts.addToken(new Text(nn4), token4); @@ -287,24 +306,23 @@ public void testDTRenewal () throws IOException, URISyntaxException { JobID jid2 = new JobID("job2",1); DelegationTokenRenewal.registerDelegationTokensForRenewal(jid2, ts, conf); DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jid2); - numberOfExpectedRenewals = dfs.getCounter(); // number of renewals so far + numberOfExpectedRenewals = Renewer.counter; // number of renewals so far try { Thread.sleep(6*1000); // sleep 6 seconds, so it has time to renew } catch (InterruptedException e) {} - System.out.println("Counter = " + dfs.getCounter() + ";t="+dfs.getToken()); + System.out.println("Counter = " + Renewer.counter + ";t="+ + Renewer.lastRenewed); // counter and the token should stil be the old ones assertEquals("renew wasn't called as many times as expected", - numberOfExpectedRenewals, dfs.getCounter()); + numberOfExpectedRenewals, Renewer.counter); // also renewing of the cancelled token should fail - boolean exception=false; try { - dfs.renewDelegationToken(token4); + token4.renew(conf); + fail("Renew of canceled token didn't fail"); } catch (InvalidToken ite) { //expected - exception = true; } - assertTrue("Renew of canceled token didn't fail", exception); } }