From 481f84597bf842df45b068cc24c328112e8bcf40 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Sat, 25 Feb 2012 00:03:26 +0000 Subject: [PATCH] HDFS-2904. Client support for getting delegation tokens. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1293486 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-1623.txt | 2 + .../org/apache/hadoop/hdfs/DFSClient.java | 90 ++++--- .../java/org/apache/hadoop/hdfs/HAUtil.java | 176 +++++++++++-- .../hadoop/hdfs/protocol/HdfsConstants.java | 8 + .../delegation/DelegationTokenSelector.java | 5 + .../server/balancer/NameNodeConnector.java | 24 +- .../ha/ConfiguredFailoverProxyProvider.java | 6 + .../ha/TestDelegationTokensWithHA.java | 234 ++++++++++++++++++ 8 files changed, 479 insertions(+), 66 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index fbdf1ecf36..e8e63bf702 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -230,3 +230,5 @@ HDFS-2973. Re-enable NO_ACK optimization for block deletion. (todd) HDFS-2922. HA: close out operation categories (eli) HDFS-2993. HA: BackupNode#checkOperation should permit CHECKPOINT operations (eli) + +HDFS-2904. Client support for getting delegation tokens. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index a80e34b2eb..dcbc88f10c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -30,6 +30,7 @@ import java.net.Socket; import java.net.SocketException; import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -60,6 +61,8 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; + +import org.apache.hadoop.hdfs.HAUtil.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; @@ -107,6 +110,8 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import com.google.common.base.Preconditions; + /******************************************************** * DFSClient can connect to a Hadoop Filesystem and * perform basic file tasks. It uses the ClientProtocol @@ -124,7 +129,9 @@ public class DFSClient implements java.io.Closeable { public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB final ClientProtocol namenode; - private final InetSocketAddress nnAddress; + /* The service used for delegation tokens */ + private Text dtService; + final UserGroupInformation ugi; volatile boolean clientRunning = true; private volatile FsServerDefaults serverDefaults; @@ -308,29 +315,22 @@ public DFSClient(URI nameNodeUri, Configuration conf, this.clientName = leaserenewer.getClientName(dfsClientConf.taskId); this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity); - ClientProtocol failoverNNProxy = (ClientProtocol) HAUtil - .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class); - if (nameNodeUri != null && failoverNNProxy != null) { - this.namenode = failoverNNProxy; - nnAddress = null; - } else if (nameNodeUri != null && rpcNamenode == null) { - this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf); - - // TODO(HA): This doesn't really apply in the case of HA. Need to get smart - // about tokens in an HA setup, generally. - nnAddress = NameNode.getAddress(nameNodeUri); - } else if (nameNodeUri == null && rpcNamenode != null) { - //This case is used for testing. + + + if (rpcNamenode != null) { + // This case is used for testing. + Preconditions.checkArgument(nameNodeUri == null); this.namenode = rpcNamenode; - - // TODO(HA): This doesn't really apply in the case of HA. Need to get smart - // about tokens in an HA setup, generally. - nnAddress = null; + dtService = null; } else { - throw new IllegalArgumentException( - "Expecting exactly one of nameNodeUri and rpcNamenode being null: " - + "nameNodeUri=" + nameNodeUri + ", rpcNamenode=" + rpcNamenode); + Preconditions.checkArgument(nameNodeUri != null, + "null URI"); + ProxyAndInfo proxyInfo = + HAUtil.createProxy(conf, nameNodeUri, ClientProtocol.class); + this.dtService = proxyInfo.getDelegationTokenService(); + this.namenode = proxyInfo.getProxy(); } + // read directly from the block file if configured. this.shortCircuitLocalReads = conf.getBoolean( DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, @@ -523,11 +523,13 @@ public FsServerDefaults getServerDefaults() throws IOException { */ public Token getDelegationToken(Text renewer) throws IOException { - Token result = + assert dtService != null; + Token token = namenode.getDelegationToken(renewer); - SecurityUtil.setTokenService(result, nnAddress); - LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(result)); - return result; + token.setService(this.dtService); + + LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token)); + return token; } /** @@ -658,13 +660,8 @@ public boolean handleKind(Text kind) { @Override public long renew(Token token, Configuration conf) throws IOException { Token delToken = - (Token) token; - LOG.info("Renewing " + - DelegationTokenIdentifier.stringifyToken(delToken)); - ClientProtocol nn = - DFSUtil.createNamenode - (SecurityUtil.getTokenServiceAddr(delToken), - conf, UserGroupInformation.getCurrentUser()); + (Token) token; + ClientProtocol nn = getNNProxy(delToken, conf); try { return nn.renewDelegationToken(delToken); } catch (RemoteException re) { @@ -680,9 +677,7 @@ public void cancel(Token token, Configuration conf) throws IOException { (Token) token; LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(delToken)); - ClientProtocol nn = DFSUtil.createNamenode( - SecurityUtil.getTokenServiceAddr(delToken), conf, - UserGroupInformation.getCurrentUser()); + ClientProtocol nn = getNNProxy(delToken, conf); try { nn.cancelDelegationToken(delToken); } catch (RemoteException re) { @@ -690,6 +685,31 @@ public void cancel(Token token, Configuration conf) throws IOException { AccessControlException.class); } } + + private static ClientProtocol getNNProxy( + Token token, Configuration conf) + throws IOException { + URI uri = HAUtil.getServiceUriFromToken(token); + if (HAUtil.isTokenForLogicalUri(token) && + !HAUtil.isLogicalUri(conf, uri)) { + // If the token is for a logical nameservice, but the configuration + // we have disagrees about that, we can't actually renew it. + // This can be the case in MR, for example, if the RM doesn't + // have all of the HA clusters configured in its configuration. + throw new IOException("Unable to map logical nameservice URI '" + + uri + "' to a NameNode. Local configuration does not have " + + "a failover proxy provider configured."); + } + + ProxyAndInfo info = + HAUtil.createProxy(conf, uri, ClientProtocol.class); + assert info.getDelegationTokenService().equals(token.getService()) : + "Returned service '" + info.getDelegationTokenService().toString() + + "' doesn't match expected service '" + + token.getService().toString() + "'"; + + return info.getProxy(); + } @Override public boolean isManaged(Token token) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java index 753cb3bf67..0a322140da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java @@ -22,6 +22,7 @@ import java.lang.reflect.Constructor; import java.net.InetSocketAddress; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -31,11 +32,21 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.FailoverProxyProvider; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -177,14 +188,14 @@ public static void setAllowStandbyReads(Configuration conf, boolean val) { /** Creates the Failover proxy provider instance*/ @SuppressWarnings("unchecked") - public static FailoverProxyProvider createFailoverProxyProvider( - Configuration conf, Class> failoverProxyProviderClass, - Class xface, URI nameNodeUri) throws IOException { + private static FailoverProxyProvider createFailoverProxyProvider( + Configuration conf, Class> failoverProxyProviderClass, + Class xface, URI nameNodeUri) throws IOException { Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { - Constructor> ctor = failoverProxyProviderClass + Constructor> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider provider = ctor.newInstance(conf, nameNodeUri, xface); @@ -203,7 +214,7 @@ public static FailoverProxyProvider createFailoverProxyProvider( } /** Gets the configured Failover proxy provider's class */ - public static Class> getFailoverProxyProviderClass( + private static Class> getFailoverProxyProviderClass( Configuration conf, URI nameNodeUri, Class xface) throws IOException { if (nameNodeUri == null) { return null; @@ -238,24 +249,161 @@ public static Class> getFailoverProxyProviderClass( } } } + + /** + * @return true if the given nameNodeUri appears to be a logical URI. + * This is the case if there is a failover proxy provider configured + * for it in the given configuration. + */ + public static boolean isLogicalUri( + Configuration conf, URI nameNodeUri) { + String host = nameNodeUri.getHost(); + String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + + host; + return conf.get(configKey) != null; + } - /** Creates the namenode proxy with the passed Protocol */ + /** + * Creates the namenode proxy with the passed Protocol. + * @param conf the configuration containing the required IPC + * properties, client failover configurations, etc. + * @param nameNodeUri the URI pointing either to a specific NameNode + * or to a logical nameservice. + * @param xface the IPC interface which should be created + * @return an object containing both the proxy and the associated + * delegation token service it corresponds to + **/ @SuppressWarnings("unchecked") - public static Object createFailoverProxy(Configuration conf, URI nameNodeUri, - Class xface) throws IOException { - Class> failoverProxyProviderClass = HAUtil - .getFailoverProxyProviderClass(conf, nameNodeUri, xface); - if (failoverProxyProviderClass != null) { - FailoverProxyProvider failoverProxyProvider = HAUtil + public static ProxyAndInfo createProxy( + Configuration conf, URI nameNodeUri, + Class xface) throws IOException { + Class> failoverProxyProviderClass = + HAUtil.getFailoverProxyProviderClass(conf, nameNodeUri, xface); + + if (failoverProxyProviderClass == null) { + // Non-HA case + return createNonHAProxy(conf, nameNodeUri, xface); + } else { + // HA case + FailoverProxyProvider failoverProxyProvider = HAUtil .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, nameNodeUri); Conf config = new Conf(conf); - return RetryProxy.create(xface, failoverProxyProvider, RetryPolicies + T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); + + Text dtService = buildTokenServiceForLogicalUri(nameNodeUri); + return new ProxyAndInfo(proxy, dtService); } - return null; } + @SuppressWarnings("unchecked") + private static ProxyAndInfo createNonHAProxy( + Configuration conf, URI nameNodeUri, Class xface) throws IOException { + InetSocketAddress nnAddr = NameNode.getAddress(nameNodeUri); + Text dtService = SecurityUtil.buildTokenService(nnAddr); + + if (xface == ClientProtocol.class) { + T proxy = (T)DFSUtil.createNamenode(nnAddr, conf); + return new ProxyAndInfo(proxy, dtService); + } else if (xface == NamenodeProtocol.class) { + T proxy = (T) DFSUtil.createNNProxyWithNamenodeProtocol( + nnAddr, conf, UserGroupInformation.getCurrentUser()); + return new ProxyAndInfo(proxy, dtService); + } else { + throw new AssertionError("Unsupported proxy type: " + xface); + } + } + + /** + * Parse the HDFS URI out of the provided token. + * @throws IOException if the token is invalid + */ + public static URI getServiceUriFromToken( + Token token) + throws IOException { + String tokStr = token.getService().toString(); + + if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) { + tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, ""); + } + + try { + return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" + + tokStr); + } catch (URISyntaxException e) { + throw new IOException("Invalid token contents: '" + + tokStr + "'"); + } + } + + /** + * Get the service name used in the delegation token for the given logical + * HA service. + * @param uri the logical URI of the cluster + * @return the service name + */ + public static Text buildTokenServiceForLogicalUri(URI uri) { + return new Text(HA_DT_SERVICE_PREFIX + uri.getHost()); + } + + /** + * @return true if this token corresponds to a logical nameservice + * rather than a specific namenode. + */ + public static boolean isTokenForLogicalUri( + Token token) { + return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); + } + + /** + * Locate a delegation token associated with the given HA cluster URI, and if + * one is found, clone it to also represent the underlying namenode address. + * @param ugi the UGI to modify + * @param haUri the logical URI for the cluster + * @param singleNNAddr one of the NNs in the cluster to which the token + * applies + */ + public static void cloneDelegationTokenForLogicalUri( + UserGroupInformation ugi, URI haUri, + InetSocketAddress singleNNAddr) { + Text haService = buildTokenServiceForLogicalUri(haUri); + Token haToken = + DelegationTokenSelector.selectHdfsDelegationToken(haService, ugi); + if (haToken == null) { + // no token + return; + } + Token specificToken = + new Token(haToken); + specificToken.setService(SecurityUtil.buildTokenService(singleNNAddr)); + ugi.addToken(specificToken); + LOG.debug("Mapped HA service delegation token for logical URI " + + haUri + " to namenode " + singleNNAddr); + } + + /** + * Wrapper for a client proxy as well as its associated service ID. + * This is simply used as a tuple-like return type for + * {@link HAUtil#createProxy(Configuration, URI, Class)}. + */ + public static class ProxyAndInfo { + private final PROXYTYPE proxy; + private final Text dtService; + + public ProxyAndInfo(PROXYTYPE proxy, Text dtService) { + this.proxy = proxy; + this.dtService = dtService; + } + + public PROXYTYPE getProxy() { + return proxy; + } + + public Text getDelegationTokenService() { + return dtService; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index 6b4835facc..da64b9e764 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -99,6 +99,14 @@ public static enum UpgradeAction { */ public static final String HDFS_URI_SCHEME = "hdfs"; + /** + * A prefix put before the namenode URI inside the "service" field + * of a delgation token, indicating that the URI is a logical (HA) + * URI. + */ + public static final String HA_DT_SERVICE_PREFIX = "ha-hdfs:"; + + /** * Please see {@link LayoutVersion} on adding new layout version. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java index 1822b27a1c..4f73b85164 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java @@ -59,6 +59,11 @@ public static Token selectHdfsDelegationToken( new InetSocketAddress(nnAddr.getHostName(), nnRpcPort)); return INSTANCE.selectToken(serviceName, ugi.getTokens()); } + + public static Token selectHdfsDelegationToken( + Text serviceName, UserGroupInformation ugi) { + return INSTANCE.selectToken(serviceName, ugi.getTokens()); + } public DelegationTokenSelector() { super(DelegationTokenIdentifier.HDFS_DELEGATION_KIND); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 04657715c2..e45f721b75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; @@ -76,21 +74,13 @@ class NameNodeConnector { Configuration conf) throws IOException { this.namenodeAddress = Lists.newArrayList(haNNs).get(0); URI nameNodeUri = NameNode.getUri(this.namenodeAddress); - NamenodeProtocol failoverNamenode = (NamenodeProtocol) HAUtil - .createFailoverProxy(conf, nameNodeUri, NamenodeProtocol.class); - if (null != failoverNamenode) { - this.namenode = failoverNamenode; - } else { - this.namenode = DFSUtil.createNNProxyWithNamenodeProtocol( - this.namenodeAddress, conf, UserGroupInformation.getCurrentUser()); - } - ClientProtocol failOverClient = (ClientProtocol) HAUtil - .createFailoverProxy(conf, nameNodeUri, ClientProtocol.class); - if (null != failOverClient) { - this.client = failOverClient; - } else { - this.client = DFSUtil.createNamenode(conf); - } + + this.namenode = + HAUtil.createProxy(conf, nameNodeUri, NamenodeProtocol.class) + .getProxy(); + this.client = + HAUtil.createProxy(conf, nameNodeUri, ClientProtocol.class) + .getProxy(); this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java index 6f6f88f9e8..79223a0455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -95,6 +96,11 @@ public ConfiguredFailoverProxyProvider(Configuration conf, URI uri, for (InetSocketAddress address : addressesInNN.values()) { proxies.add(new AddressRpcProxyPair(address)); + + // The client may have a delegation token set for the logical + // URI of the cluster. Clone this token to apply to each of the + // underlying IPC addresses so that the IPC code can find it. + HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, address); } } catch (IOException e) { throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java new file mode 100644 index 0000000000..561e4d6103 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Joiner; + +/** + * Test case for client support of delegation tokens in an HA cluster. + * See HDFS-2904 for more info. + **/ +public class TestDelegationTokensWithHA { + private static Configuration conf = new Configuration(); + private static final Log LOG = + LogFactory.getLog(TestDelegationTokensWithHA.class); + private static MiniDFSCluster cluster; + private static NameNode nn0; + private static NameNode nn1; + private static FileSystem fs; + private static DelegationTokenSecretManager dtSecretManager; + private static DistributedFileSystem dfs; + + @BeforeClass + public static void setupCluster() throws Exception { + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); + conf.set("hadoop.security.auth_to_local", + "RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT"); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + cluster.waitActive(); + + nn0 = cluster.getNameNode(0); + nn1 = cluster.getNameNode(1); + fs = HATestUtil.configureFailoverFs(cluster, conf); + dfs = (DistributedFileSystem)fs; + + cluster.transitionToActive(0); + dtSecretManager = NameNodeAdapter.getDtSecretManager( + nn0.getNamesystem()); + } + + @AfterClass + public static void shutdownCluster() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + + @Test + public void testDelegationTokenDFSApi() throws Exception { + Token token = dfs.getDelegationToken("JobTracker"); + DelegationTokenIdentifier identifier = new DelegationTokenIdentifier(); + byte[] tokenId = token.getIdentifier(); + identifier.readFields(new DataInputStream( + new ByteArrayInputStream(tokenId))); + + // Ensure that it's present in the NN's secret manager and can + // be renewed directly from there. + LOG.info("A valid token should have non-null password, " + + "and should be renewed successfully"); + assertTrue(null != dtSecretManager.retrievePassword(identifier)); + dtSecretManager.renewToken(token, "JobTracker"); + + // Use the client conf with the failover info present to check + // renewal. + Configuration clientConf = dfs.getConf(); + doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); + + // Using a configuration that doesn't have the logical nameservice + // configured should result in a reasonable error message. + Configuration emptyConf = new Configuration(); + try { + doRenewOrCancel(token, emptyConf, TokenTestAction.RENEW); + fail("Did not throw trying to renew with an empty conf!"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Unable to map logical nameservice URI", ioe); + } + + + // Ensure that the token can be renewed again after a failover. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + doRenewOrCancel(token, clientConf, TokenTestAction.RENEW); + + doRenewOrCancel(token, clientConf, TokenTestAction.CANCEL); + } + + @SuppressWarnings("deprecation") + @Test + public void testDelegationTokenWithDoAs() throws Exception { + final Token token = + dfs.getDelegationToken("JobTracker"); + final UserGroupInformation longUgi = UserGroupInformation + .createRemoteUser("JobTracker/foo.com@FOO.COM"); + final UserGroupInformation shortUgi = UserGroupInformation + .createRemoteUser("JobTracker"); + longUgi.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + DistributedFileSystem dfs = (DistributedFileSystem) + HATestUtil.configureFailoverFs(cluster, conf); + // try renew with long name + dfs.renewDelegationToken(token); + return null; + } + }); + shortUgi.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + DistributedFileSystem dfs = (DistributedFileSystem) + HATestUtil.configureFailoverFs(cluster, conf); + dfs.renewDelegationToken(token); + return null; + } + }); + longUgi.doAs(new PrivilegedExceptionAction() { + public Void run() throws Exception { + DistributedFileSystem dfs = (DistributedFileSystem) + HATestUtil.configureFailoverFs(cluster, conf); + // try cancel with long name + dfs.cancelDelegationToken(token); + return null; + } + }); + } + + @Test + public void testHAUtilClonesDelegationTokens() throws Exception { + final Token token = + dfs.getDelegationToken("test"); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test"); + + URI haUri = new URI("hdfs://my-ha-uri/"); + token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri)); + ugi.addToken(token); + HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nn0.getNameNodeAddress()); + HAUtil.cloneDelegationTokenForLogicalUri(ugi, haUri, nn1.getNameNodeAddress()); + + Collection> tokens = ugi.getTokens(); + assertEquals(3, tokens.size()); + + LOG.info("Tokens:\n" + Joiner.on("\n").join(tokens)); + + // check that the token selected for one of the physical IPC addresses + // matches the one we received + InetSocketAddress addr = nn0.getNameNodeAddress(); + Text ipcDtService = new Text( + addr.getAddress().getHostAddress() + ":" + addr.getPort()); + Token token2 = + DelegationTokenSelector.selectHdfsDelegationToken(ipcDtService, ugi); + assertNotNull(token2); + assertArrayEquals(token.getIdentifier(), token2.getIdentifier()); + assertArrayEquals(token.getPassword(), token2.getPassword()); + } + + enum TokenTestAction { + RENEW, CANCEL; + } + + private static void doRenewOrCancel( + final Token token, final Configuration conf, + final TokenTestAction action) + throws IOException, InterruptedException { + UserGroupInformation.createRemoteUser("JobTracker").doAs( + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + switch (action) { + case RENEW: + token.renew(conf); + break; + case CANCEL: + token.cancel(conf); + break; + default: + fail("bad action:" + action); + } + return null; + } + }); + } +}