HDFS-8855. Webhdfs client leaks active NameNode connections. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
e8a87d739f
commit
fe5624b85d
@ -19,6 +19,8 @@
|
|||||||
package org.apache.hadoop.security.token;
|
package org.apache.hadoop.security.token;
|
||||||
|
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.primitives.Bytes;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -32,6 +34,7 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client-side form of the token.
|
* The client-side form of the token.
|
||||||
@ -338,6 +341,11 @@ public String toString() {
|
|||||||
return buffer.toString();
|
return buffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String buildCacheKey() {
|
||||||
|
return UUID.nameUUIDFromBytes(
|
||||||
|
Bytes.concat(kind.getBytes(), identifier, password)).toString();
|
||||||
|
}
|
||||||
|
|
||||||
private static ServiceLoader<TokenRenewer> renewers =
|
private static ServiceLoader<TokenRenewer> renewers =
|
||||||
ServiceLoader.load(TokenRenewer.class);
|
ServiceLoader.load(TokenRenewer.class);
|
||||||
|
|
||||||
|
@ -2373,6 +2373,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally.
|
HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally.
|
||||||
(Wei-Chiu Chuang via cnauroth)
|
(Wei-Chiu Chuang via cnauroth)
|
||||||
|
|
||||||
|
HDFS-8855. Webhdfs client leaks active NameNode connections.
|
||||||
|
(Xiaobing Zhou via xyao)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -70,6 +70,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK =
|
public static final String DFS_WEBHDFS_NETTY_HIGH_WATERMARK =
|
||||||
"dfs.webhdfs.netty.high.watermark";
|
"dfs.webhdfs.netty.high.watermark";
|
||||||
public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535;
|
public static final int DFS_WEBHDFS_NETTY_HIGH_WATERMARK_DEFAULT = 65535;
|
||||||
|
public static final String DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY =
|
||||||
|
"dfs.webhdfs.ugi.expire.after.access";
|
||||||
|
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
|
||||||
|
10*60*1000; //10 minutes
|
||||||
|
|
||||||
// HA related configuration
|
// HA related configuration
|
||||||
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
|
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
@ -74,7 +75,6 @@ public class DatanodeHttpServer implements Closeable {
|
|||||||
private final Configuration confForCreate;
|
private final Configuration confForCreate;
|
||||||
private InetSocketAddress httpAddress;
|
private InetSocketAddress httpAddress;
|
||||||
private InetSocketAddress httpsAddress;
|
private InetSocketAddress httpsAddress;
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class);
|
static final Log LOG = LogFactory.getLog(DatanodeHttpServer.class);
|
||||||
|
|
||||||
public DatanodeHttpServer(final Configuration conf,
|
public DatanodeHttpServer(final Configuration conf,
|
||||||
@ -99,7 +99,7 @@ public DatanodeHttpServer(final Configuration conf,
|
|||||||
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
|
||||||
this.infoServer.addServlet(null, "/blockScannerReport",
|
this.infoServer.addServlet(null, "/blockScannerReport",
|
||||||
BlockScanner.Servlet.class);
|
BlockScanner.Servlet.class);
|
||||||
|
DataNodeUGIProvider.init(conf);
|
||||||
this.infoServer.start();
|
this.infoServer.start();
|
||||||
final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
|
final InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
|
||||||
|
|
||||||
|
@ -13,50 +13,103 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.cache.Cache;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create UGI from the request for the WebHDFS requests for the DNs. Note that
|
* Create UGI from the request for the WebHDFS requests for the DNs. Note that
|
||||||
* the DN does not authenticate the UGI -- the NN will authenticate them in
|
* the DN does not authenticate the UGI -- the NN will authenticate them in
|
||||||
* subsequent operations.
|
* subsequent operations.
|
||||||
*/
|
*/
|
||||||
class DataNodeUGIProvider {
|
public class DataNodeUGIProvider {
|
||||||
private final ParameterParser params;
|
private final ParameterParser params;
|
||||||
|
@VisibleForTesting
|
||||||
|
static Cache<String, UserGroupInformation> ugiCache;
|
||||||
|
public static final Log LOG = LogFactory.getLog(Client.class);
|
||||||
|
|
||||||
DataNodeUGIProvider(ParameterParser params) {
|
DataNodeUGIProvider(ParameterParser params) {
|
||||||
this.params = params;
|
this.params = params;
|
||||||
}
|
}
|
||||||
|
|
||||||
UserGroupInformation ugi() throws IOException {
|
public static synchronized void init(Configuration conf) {
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (ugiCache == null) {
|
||||||
return tokenUGI();
|
ugiCache = CacheBuilder
|
||||||
|
.newBuilder()
|
||||||
|
.expireAfterAccess(
|
||||||
|
conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT),
|
||||||
|
TimeUnit.MILLISECONDS).build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UserGroupInformation ugi() throws IOException {
|
||||||
|
UserGroupInformation ugi;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
final Token<DelegationTokenIdentifier> token = params.delegationToken();
|
||||||
|
|
||||||
|
ugi = ugiCache.get(buildTokenCacheKey(token),
|
||||||
|
new Callable<UserGroupInformation>() {
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation call() throws Exception {
|
||||||
|
return tokenUGI(token);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
final String usernameFromQuery = params.userName();
|
final String usernameFromQuery = params.userName();
|
||||||
final String doAsUserFromQuery = params.doAsUser();
|
final String doAsUserFromQuery = params.doAsUser();
|
||||||
final String remoteUser = usernameFromQuery == null
|
final String remoteUser = usernameFromQuery == null ? JspHelper
|
||||||
? JspHelper.getDefaultWebUserName(params.conf()) // not specified in
|
.getDefaultWebUserName(params.conf()) // not specified in request
|
||||||
// request
|
|
||||||
: usernameFromQuery;
|
: usernameFromQuery;
|
||||||
|
|
||||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
|
ugi = ugiCache.get(
|
||||||
JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
|
buildNonTokenCacheKey(doAsUserFromQuery, remoteUser),
|
||||||
if (doAsUserFromQuery != null) {
|
new Callable<UserGroupInformation>() {
|
||||||
// create and attempt to authorize a proxy user
|
@Override
|
||||||
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
|
public UserGroupInformation call() throws Exception {
|
||||||
|
return nonTokenUGI(usernameFromQuery, doAsUserFromQuery,
|
||||||
|
remoteUser);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause instanceof IOException) {
|
||||||
|
throw (IOException) cause;
|
||||||
|
} else {
|
||||||
|
throw new IOException(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
private UserGroupInformation tokenUGI() throws IOException {
|
private String buildTokenCacheKey(Token<DelegationTokenIdentifier> token) {
|
||||||
Token<DelegationTokenIdentifier> token = params.delegationToken();
|
return token.buildCacheKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
private UserGroupInformation tokenUGI(Token<DelegationTokenIdentifier> token)
|
||||||
|
throws IOException {
|
||||||
ByteArrayInputStream buf =
|
ByteArrayInputStream buf =
|
||||||
new ByteArrayInputStream(token.getIdentifier());
|
new ByteArrayInputStream(token.getIdentifier());
|
||||||
DataInputStream in = new DataInputStream(buf);
|
DataInputStream in = new DataInputStream(buf);
|
||||||
@ -67,4 +120,23 @@ private UserGroupInformation tokenUGI() throws IOException {
|
|||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String buildNonTokenCacheKey(String doAsUserFromQuery,
|
||||||
|
String remoteUser) throws IOException {
|
||||||
|
String key = doAsUserFromQuery == null ? String.format("{%s}", remoteUser)
|
||||||
|
: String.format("{%s}:{%s}", remoteUser, doAsUserFromQuery);
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
private UserGroupInformation nonTokenUGI(String usernameFromQuery,
|
||||||
|
String doAsUserFromQuery, String remoteUser) throws IOException {
|
||||||
|
|
||||||
|
UserGroupInformation ugi = UserGroupInformation
|
||||||
|
.createRemoteUser(remoteUser);
|
||||||
|
JspHelper.checkUsername(ugi.getShortUserName(), usernameFromQuery);
|
||||||
|
if (doAsUserFromQuery != null) {
|
||||||
|
// create and attempt to authorize a proxy user
|
||||||
|
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
|
||||||
|
}
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2481,6 +2481,14 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.webhdfs.ugi.expire.after.access</name>
|
||||||
|
<value>600000</value>
|
||||||
|
<description>How long in milliseconds after the last access
|
||||||
|
the cached UGI will expire. With 0, never expire.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
|
<name>dfs.namenode.blocks.per.postponedblocks.rescan</name>
|
||||||
<value>10000</value>
|
<value>10000</value>
|
||||||
|
@ -0,0 +1,235 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||||
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
public class TestDataNodeUGIProvider {
|
||||||
|
private final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://"
|
||||||
|
+ "127.0.0.1:0");
|
||||||
|
private final String PATH = "/foo";
|
||||||
|
private final int OFFSET = 42;
|
||||||
|
private final int LENGTH = 512;
|
||||||
|
private final static int EXPIRE_AFTER_ACCESS = 5*1000;
|
||||||
|
private Configuration conf;
|
||||||
|
@Before
|
||||||
|
public void setUp(){
|
||||||
|
conf = WebHdfsTestUtil.createConf();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_KEY,
|
||||||
|
EXPIRE_AFTER_ACCESS);
|
||||||
|
DataNodeUGIProvider.init(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUGICacheSecure() throws Exception {
|
||||||
|
// fake turning on security so api thinks it should use tokens
|
||||||
|
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
|
||||||
|
UserGroupInformation ugi = UserGroupInformation
|
||||||
|
.createRemoteUser("test-user");
|
||||||
|
ugi.setAuthenticationMethod(KERBEROS);
|
||||||
|
ugi = UserGroupInformation.createProxyUser("test-proxy-user", ugi);
|
||||||
|
UserGroupInformation.setLoginUser(ugi);
|
||||||
|
|
||||||
|
List<Token<DelegationTokenIdentifier>> tokens = Lists.newArrayList();
|
||||||
|
getWebHdfsFileSystem(ugi, conf, tokens);
|
||||||
|
|
||||||
|
String uri1 = WebHdfsFileSystem.PATH_PREFIX
|
||||||
|
+ PATH
|
||||||
|
+ "?op=OPEN"
|
||||||
|
+ Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
|
||||||
|
new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
|
||||||
|
new DelegationParam(tokens.get(0).encodeToUrlString()));
|
||||||
|
|
||||||
|
String uri2 = WebHdfsFileSystem.PATH_PREFIX
|
||||||
|
+ PATH
|
||||||
|
+ "?op=OPEN"
|
||||||
|
+ Param.toSortedString("&", new NamenodeAddressParam("127.0.0.1:1010"),
|
||||||
|
new OffsetParam((long) OFFSET), new LengthParam((long) LENGTH),
|
||||||
|
new DelegationParam(tokens.get(1).encodeToUrlString()));
|
||||||
|
|
||||||
|
DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
|
||||||
|
new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
|
||||||
|
UserGroupInformation ugi11 = ugiProvider1.ugi();
|
||||||
|
UserGroupInformation ugi12 = ugiProvider1.ugi();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"With UGI cache, two UGIs returned by the same token should be same",
|
||||||
|
ugi11, ugi12);
|
||||||
|
|
||||||
|
DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
|
||||||
|
new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
|
||||||
|
UserGroupInformation url21 = ugiProvider2.ugi();
|
||||||
|
UserGroupInformation url22 = ugiProvider2.ugi();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"With UGI cache, two UGIs returned by the same token should be same",
|
||||||
|
url21, url22);
|
||||||
|
|
||||||
|
Assert.assertNotEquals(
|
||||||
|
"With UGI cache, two UGIs for the different token should not be same",
|
||||||
|
ugi11, url22);
|
||||||
|
|
||||||
|
awaitCacheEmptyDueToExpiration();
|
||||||
|
ugi12 = ugiProvider1.ugi();
|
||||||
|
url22 = ugiProvider2.ugi();
|
||||||
|
|
||||||
|
String msg = "With cache eviction, two UGIs returned" +
|
||||||
|
" by the same token should not be same";
|
||||||
|
Assert.assertNotEquals(msg, ugi11, ugi12);
|
||||||
|
Assert.assertNotEquals(msg, url21, url22);
|
||||||
|
|
||||||
|
Assert.assertNotEquals(
|
||||||
|
"With UGI cache, two UGIs for the different token should not be same",
|
||||||
|
ugi11, url22);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUGICacheInSecure() throws Exception {
|
||||||
|
String uri1 = WebHdfsFileSystem.PATH_PREFIX
|
||||||
|
+ PATH
|
||||||
|
+ "?op=OPEN"
|
||||||
|
+ Param.toSortedString("&", new OffsetParam((long) OFFSET),
|
||||||
|
new LengthParam((long) LENGTH), new UserParam("root"));
|
||||||
|
|
||||||
|
String uri2 = WebHdfsFileSystem.PATH_PREFIX
|
||||||
|
+ PATH
|
||||||
|
+ "?op=OPEN"
|
||||||
|
+ Param.toSortedString("&", new OffsetParam((long) OFFSET),
|
||||||
|
new LengthParam((long) LENGTH), new UserParam("hdfs"));
|
||||||
|
|
||||||
|
DataNodeUGIProvider ugiProvider1 = new DataNodeUGIProvider(
|
||||||
|
new ParameterParser(new QueryStringDecoder(URI.create(uri1)), conf));
|
||||||
|
UserGroupInformation ugi11 = ugiProvider1.ugi();
|
||||||
|
UserGroupInformation ugi12 = ugiProvider1.ugi();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"With UGI cache, two UGIs for the same user should be same", ugi11,
|
||||||
|
ugi12);
|
||||||
|
|
||||||
|
DataNodeUGIProvider ugiProvider2 = new DataNodeUGIProvider(
|
||||||
|
new ParameterParser(new QueryStringDecoder(URI.create(uri2)), conf));
|
||||||
|
UserGroupInformation url21 = ugiProvider2.ugi();
|
||||||
|
UserGroupInformation url22 = ugiProvider2.ugi();
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"With UGI cache, two UGIs for the same user should be same", url21,
|
||||||
|
url22);
|
||||||
|
|
||||||
|
Assert.assertNotEquals(
|
||||||
|
"With UGI cache, two UGIs for the different user should not be same",
|
||||||
|
ugi11, url22);
|
||||||
|
|
||||||
|
awaitCacheEmptyDueToExpiration();
|
||||||
|
ugi12 = ugiProvider1.ugi();
|
||||||
|
url22 = ugiProvider2.ugi();
|
||||||
|
|
||||||
|
String msg = "With cache eviction, two UGIs returned by" +
|
||||||
|
" the same user should not be same";
|
||||||
|
Assert.assertNotEquals(msg, ugi11, ugi12);
|
||||||
|
Assert.assertNotEquals(msg, url21, url22);
|
||||||
|
|
||||||
|
Assert.assertNotEquals(
|
||||||
|
"With UGI cache, two UGIs for the different user should not be same",
|
||||||
|
ugi11, url22);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for expiration of entries from the UGI cache. We need to be careful
|
||||||
|
* not to touch the entries in the cache while we're waiting for expiration.
|
||||||
|
* If we did, then that would reset the clock on expiration for those entries.
|
||||||
|
* Instead, we trigger internal clean-up of the cache and check for size 0.
|
||||||
|
*
|
||||||
|
* @throws Exception if there is any error
|
||||||
|
*/
|
||||||
|
private void awaitCacheEmptyDueToExpiration() throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
DataNodeUGIProvider.ugiCache.cleanUp();
|
||||||
|
return DataNodeUGIProvider.ugiCache.size() == 0;
|
||||||
|
}
|
||||||
|
}, EXPIRE_AFTER_ACCESS, 10 * EXPIRE_AFTER_ACCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WebHdfsFileSystem getWebHdfsFileSystem(UserGroupInformation ugi,
|
||||||
|
Configuration conf, List<Token<DelegationTokenIdentifier>> tokens)
|
||||||
|
throws IOException {
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
|
||||||
|
ugi.getUserName()), null, null);
|
||||||
|
FSNamesystem namesystem = mock(FSNamesystem.class);
|
||||||
|
DelegationTokenSecretManager dtSecretManager = new DelegationTokenSecretManager(
|
||||||
|
86400000, 86400000, 86400000, 86400000, namesystem);
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
Token<DelegationTokenIdentifier> token1 = new Token<DelegationTokenIdentifier>(
|
||||||
|
dtId, dtSecretManager);
|
||||||
|
Token<DelegationTokenIdentifier> token2 = new Token<DelegationTokenIdentifier>(
|
||||||
|
dtId, dtSecretManager);
|
||||||
|
SecurityUtil.setTokenService(token1,
|
||||||
|
NetUtils.createSocketAddr(uri.getAuthority()));
|
||||||
|
SecurityUtil.setTokenService(token2,
|
||||||
|
NetUtils.createSocketAddr(uri.getAuthority()));
|
||||||
|
token1.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
|
||||||
|
token2.setKind(WebHdfsConstants.WEBHDFS_TOKEN_KIND);
|
||||||
|
|
||||||
|
tokens.add(token1);
|
||||||
|
tokens.add(token2);
|
||||||
|
|
||||||
|
ugi.addToken(token1);
|
||||||
|
ugi.addToken(token2);
|
||||||
|
}
|
||||||
|
return (WebHdfsFileSystem) FileSystem.get(uri, conf);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user