HDFS-6127. WebHDFS tokens cannot be renewed in HA setup. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579546 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-03-20 06:47:58 +00:00
parent 396c6c63a2
commit aa4a045925
7 changed files with 128 additions and 61 deletions

View File

@ -658,6 +658,8 @@ Release 2.4.0 - UNRELEASED
HDFS-6105. NN web UI for DN list loads the same jmx page multiple times.
(wheat9)
HDFS-6127. WebHDFS tokens cannot be renewed in HA setup. (wheat9)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -1054,7 +1054,8 @@ public void cancel(Token<?> token, Configuration conf) throws IOException {
private static ClientProtocol getNNProxy(
Token<DelegationTokenIdentifier> token, Configuration conf)
throws IOException {
URI uri = HAUtil.getServiceUriFromToken(token);
URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME,
token);
if (HAUtil.isTokenForLogicalUri(token) &&
!HAUtil.isLogicalUri(conf, uri)) {
// If the token is for a logical nameservice, but the configuration

View File

@ -68,7 +68,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;

View File

@ -17,31 +17,12 @@
*/
package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
@ -51,8 +32,28 @@
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class HAUtil {
@ -216,25 +217,16 @@ public static boolean isLogicalUri(
}
/**
* Parse the HDFS URI out of the provided token.
* @throws IOException if the token is invalid
* Parse the file system URI out of the provided token.
*/
public static URI getServiceUriFromToken(
Token<DelegationTokenIdentifier> token)
throws IOException {
public static URI getServiceUriFromToken(final String scheme,
Token<?> token) {
String tokStr = token.getService().toString();
if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) {
tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, "");
}
try {
return new URI(HdfsConstants.HDFS_URI_SCHEME + "://" +
tokStr);
} catch (URISyntaxException e) {
throw new IOException("Invalid token contents: '" +
tokStr + "'");
}
return URI.create(scheme + "://" + tokStr);
}
/**
@ -251,8 +243,7 @@ public static Text buildTokenServiceForLogicalUri(URI uri) {
* @return true if this token corresponds to a logical nameservice
* rather than a specific namenode.
*/
public static boolean isTokenForLogicalUri(
Token<DelegationTokenIdentifier> token) {
public static boolean isTokenForLogicalUri(Token<?> token) {
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
}
@ -300,7 +291,6 @@ public static void cloneDelegationTokenForLogicalUri(
* @return the internet address of the currently-active NN.
* @throws IOException if an error occurs while resolving the active NN.
*/
@SuppressWarnings("deprecation")
public static InetSocketAddress getAddressOfActive(FileSystem fs)
throws IOException {
if (!(fs instanceof DistributedFileSystem)) {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.web;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
@ -28,9 +30,10 @@
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -71,23 +74,32 @@ public long renew(Token<?> token, Configuration conf) throws IOException {
}
private TokenManagementDelegator getInstance(Token<?> token,
Configuration conf) throws IOException {
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr(token);
Text kind = token.getKind();
Configuration conf)
throws IOException {
final URI uri;
final String scheme = getSchemeByKind(token.getKind());
if (HAUtil.isTokenForLogicalUri(token)) {
uri = HAUtil.getServiceUriFromToken(scheme, token);
} else {
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
(token);
uri = URI.create(scheme + "://" + NetUtils.getHostPortString(address));
}
return (TokenManagementDelegator) FileSystem.get(uri, conf);
}
private static String getSchemeByKind(Text kind) {
if (kind.equals(HftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HftpFileSystem.SCHEME, address);
return HftpFileSystem.SCHEME;
} else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(HsftpFileSystem.SCHEME, address);
return HsftpFileSystem.SCHEME;
} else if (kind.equals(WebHdfsFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, address);
return WebHdfsFileSystem.SCHEME;
} else if (kind.equals(SWebHdfsFileSystem.TOKEN_KIND)) {
uri = DFSUtil.createUri(SWebHdfsFileSystem.SCHEME, address);
return SWebHdfsFileSystem.SCHEME;
} else {
throw new IllegalArgumentException("Unsupported scheme");
}
return (TokenManagementDelegator) FileSystem.get(uri, conf);
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.web.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import javax.servlet.ServletContext;
import java.io.IOException;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class TestDatanodeWebHdfsMethods {
private static final String LOGICAL_NAME = "minidfs";
@Test
public void testDeserializeHAToken() throws IOException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
DataNode dn = mock(DataNode.class);
doReturn(conf).when(dn).getConf();
ServletContext context = mock(ServletContext.class);
doReturn(dn).when(context).getAttribute("datanode");
final Token<DelegationTokenIdentifier> token = new
Token<DelegationTokenIdentifier>();
DatanodeWebHdfsMethods method = new DatanodeWebHdfsMethods();
Whitebox.setInternalState(method, "context", context);
final Token<DelegationTokenIdentifier> tok2 = method.deserializeToken
(token.encodeToUrlString(), LOGICAL_NAME);
Assert.assertTrue(HAUtil.isTokenForLogicalUri(tok2));
}
}

View File

@ -22,8 +22,12 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.Token;
@ -33,6 +37,9 @@
import java.io.IOException;
import java.net.URI;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TestWebHDFSForHA {
private static final String LOGICAL_NAME = "minidfs";
private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME +
@ -75,10 +82,10 @@ public void testHA() throws IOException {
}
@Test
public void testSecureHA() throws IOException {
public void testSecureHAToken() throws IOException, InterruptedException {
Configuration conf = DFSTestUtil.newHAConfiguration(LOGICAL_NAME);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
conf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
MiniDFSCluster cluster = null;
WebHdfsFileSystem fs = null;
@ -89,16 +96,18 @@ public void testSecureHA() throws IOException {
HATestUtil.setFailoverConfigurations(cluster, conf, LOGICAL_NAME);
cluster.waitActive();
fs = (WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf);
fs = spy((WebHdfsFileSystem) FileSystem.get(WEBHDFS_URI, conf));
FileSystemTestHelper.addFileSystemForTesting(WEBHDFS_URI, conf, fs);
cluster.transitionToActive(0);
Token<?> token = fs.getDelegationToken(null);
cluster.shutdownNameNode(0);
cluster.transitionToActive(1);
fs.renewDelegationToken(token);
fs.cancelDelegationToken(token);
token.renew(conf);
token.cancel(conf);
verify(fs).renewDelegationToken(token);
verify(fs).cancelDelegationToken(token);
} finally {
IOUtils.cleanup(null, fs);
if (cluster != null) {