HDFS-6222. Remove background token renewer from webhdfs. Contributed by Rushabh Shah and Daryn Sharp.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1604300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bfc9c4b5a5
commit
0c51289695
@ -684,6 +684,9 @@ Release 2.5.0 - UNRELEASED
|
||||
HDFS-6535. HDFS quota update is wrong when file is appended. (George Wong
|
||||
via jing9)
|
||||
|
||||
HDFS-6222. Remove background token renewer from webhdfs.
|
||||
(Rushabh Shah and Daryn Sharp via cnauroth)
|
||||
|
||||
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
|
||||
|
@ -23,6 +23,8 @@
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
@ -75,4 +77,25 @@ public static String stringifyToken(final Token<?> token) throws IOException {
|
||||
return ident.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static class WebHdfsDelegationTokenIdentifier
|
||||
extends DelegationTokenIdentifier {
|
||||
public WebHdfsDelegationTokenIdentifier() {
|
||||
super();
|
||||
}
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return WebHdfsFileSystem.TOKEN_KIND;
|
||||
}
|
||||
}
|
||||
|
||||
public static class SWebHdfsDelegationTokenIdentifier extends WebHdfsDelegationTokenIdentifier {
|
||||
public SWebHdfsDelegationTokenIdentifier() {
|
||||
super();
|
||||
}
|
||||
@Override
|
||||
public Text getKind() {
|
||||
return SWebHdfsFileSystem.TOKEN_KIND;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,8 @@ protected String getTransportScheme() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected synchronized void initializeTokenAspect() {
|
||||
tokenAspect = new TokenAspect<SWebHdfsFileSystem>(this, tokenServiceName, TOKEN_KIND);
|
||||
protected Text getTokenKind() {
|
||||
return TOKEN_KIND;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,11 +69,14 @@
|
||||
import org.apache.hadoop.io.retry.RetryUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
@ -98,7 +101,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||
|
||||
/** Delegation token kind */
|
||||
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||
protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
|
||||
private boolean canRefreshDelegationToken;
|
||||
|
||||
private UserGroupInformation ugi;
|
||||
private URI uri;
|
||||
@ -127,13 +130,8 @@ protected String getTransportScheme() {
|
||||
return "http";
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize tokenAspect. This function is intended to
|
||||
* be overridden by SWebHdfsFileSystem.
|
||||
*/
|
||||
protected synchronized void initializeTokenAspect() {
|
||||
tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
|
||||
TOKEN_KIND);
|
||||
protected Text getTokenKind() {
|
||||
return TOKEN_KIND;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -162,7 +160,6 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
this.tokenServiceName = isLogicalUri ?
|
||||
HAUtil.buildTokenServiceForLogicalUri(uri)
|
||||
: SecurityUtil.buildTokenService(getCanonicalUri());
|
||||
initializeTokenAspect();
|
||||
|
||||
if (!isHA) {
|
||||
this.retryPolicy =
|
||||
@ -195,10 +192,8 @@ public synchronized void initialize(URI uri, Configuration conf
|
||||
}
|
||||
|
||||
this.workingDir = getHomeDirectory();
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
tokenAspect.initDelegationToken(ugi);
|
||||
}
|
||||
this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
|
||||
this.delegationToken = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -213,11 +208,46 @@ public static boolean isEnabled(final Configuration conf, final Log log) {
|
||||
return b;
|
||||
}
|
||||
|
||||
TokenSelector<DelegationTokenIdentifier> tokenSelector =
|
||||
new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
|
||||
|
||||
// the first getAuthParams() for a non-token op will either get the
|
||||
// internal token from the ugi or lazy fetch one
|
||||
protected synchronized Token<?> getDelegationToken() throws IOException {
|
||||
tokenAspect.ensureTokenInitialized();
|
||||
if (canRefreshDelegationToken && delegationToken == null) {
|
||||
Token<?> token = tokenSelector.selectToken(
|
||||
new Text(getCanonicalServiceName()), ugi.getTokens());
|
||||
// ugi tokens are usually indicative of a task which can't
|
||||
// refetch tokens. even if ugi has credentials, don't attempt
|
||||
// to get another token to match hdfs/rpc behavior
|
||||
if (token != null) {
|
||||
LOG.debug("Using UGI token: " + token);
|
||||
canRefreshDelegationToken = false;
|
||||
} else {
|
||||
token = getDelegationToken(null);
|
||||
if (token != null) {
|
||||
LOG.debug("Fetched new token: " + token);
|
||||
} else { // security is disabled
|
||||
canRefreshDelegationToken = false;
|
||||
}
|
||||
}
|
||||
setDelegationToken(token);
|
||||
}
|
||||
return delegationToken;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized boolean replaceExpiredDelegationToken() throws IOException {
|
||||
boolean replaced = false;
|
||||
if (canRefreshDelegationToken) {
|
||||
Token<?> token = getDelegationToken(null);
|
||||
LOG.debug("Replaced expired token: " + token);
|
||||
setDelegationToken(token);
|
||||
replaced = (token != null);
|
||||
}
|
||||
return replaced;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getDefaultPort() {
|
||||
return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
||||
@ -288,8 +318,8 @@ private Path makeAbsolute(Path f) {
|
||||
final int code = conn.getResponseCode();
|
||||
// server is demanding an authentication we don't support
|
||||
if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
|
||||
throw new IOException(
|
||||
new AuthenticationException(conn.getResponseMessage()));
|
||||
// match hdfs/rpc exception
|
||||
throw new AccessControlException(conn.getResponseMessage());
|
||||
}
|
||||
if (code != op.getExpectedHttpResponseCode()) {
|
||||
final Map<?, ?> m;
|
||||
@ -309,7 +339,15 @@ private Path makeAbsolute(Path f) {
|
||||
return m;
|
||||
}
|
||||
|
||||
final RemoteException re = JsonUtil.toRemoteException(m);
|
||||
IOException re = JsonUtil.toRemoteException(m);
|
||||
// extract UGI-related exceptions and unwrap InvalidToken
|
||||
// the NN mangles these exceptions but the DN does not and may need
|
||||
// to re-fetch a token if either report the token is expired
|
||||
if (re.getMessage().startsWith("Failed to obtain user group information:")) {
|
||||
String[] parts = re.getMessage().split(":\\s+", 3);
|
||||
re = new RemoteException(parts[1], parts[2]);
|
||||
re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
|
||||
}
|
||||
throw unwrapException? toIOException(re): re;
|
||||
}
|
||||
return null;
|
||||
@ -369,7 +407,7 @@ Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
|
||||
// Skip adding delegation token for token operations because these
|
||||
// operations require authentication.
|
||||
Token<?> token = null;
|
||||
if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) {
|
||||
if (!op.getRequireAuth()) {
|
||||
token = getDelegationToken();
|
||||
}
|
||||
if (token != null) {
|
||||
@ -540,11 +578,17 @@ private T runWithRetry() throws IOException {
|
||||
validateResponse(op, conn, false);
|
||||
}
|
||||
return getResponse(conn);
|
||||
} catch (IOException ioe) {
|
||||
Throwable cause = ioe.getCause();
|
||||
if (cause != null && cause instanceof AuthenticationException) {
|
||||
throw ioe; // no retries for auth failures
|
||||
} catch (AccessControlException ace) {
|
||||
// no retries for auth failures
|
||||
throw ace;
|
||||
} catch (InvalidToken it) {
|
||||
// try to replace the expired token with a new one. the attempt
|
||||
// to acquire a new token must be outside this operation's retry
|
||||
// so if it fails after its own retries, this operation fails too.
|
||||
if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
|
||||
throw it;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
shouldRetry(ioe, retry);
|
||||
}
|
||||
}
|
||||
@ -712,6 +756,17 @@ public void close() throws IOException {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
|
||||
FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
|
||||
super(op, fspath, parameters);
|
||||
}
|
||||
@Override
|
||||
HttpURLConnection getResponse(final HttpURLConnection conn)
|
||||
throws IOException {
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by open() which tracks the resolved url itself
|
||||
@ -1077,16 +1132,41 @@ public FSDataInputStream open(final Path f, final int buffersize
|
||||
) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
||||
final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
|
||||
// use a runner so the open can recover from an invalid token
|
||||
FsPathConnectionRunner runner =
|
||||
new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
|
||||
return new FSDataInputStream(new OffsetUrlInputStream(
|
||||
new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
|
||||
new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
synchronized (this) {
|
||||
tokenAspect.removeRenewAction();
|
||||
public synchronized void close() throws IOException {
|
||||
try {
|
||||
if (canRefreshDelegationToken && delegationToken != null) {
|
||||
cancelDelegationToken(delegationToken);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.debug("Token cancel failed: "+ioe);
|
||||
} finally {
|
||||
super.close();
|
||||
}
|
||||
}
|
||||
|
||||
// use FsPathConnectionRunner to ensure retries for InvalidTokens
|
||||
class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
private final FsPathConnectionRunner runner;
|
||||
UnresolvedUrlOpener(FsPathConnectionRunner runner) {
|
||||
super(null);
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpURLConnection connect(long offset, boolean resolved)
|
||||
throws IOException {
|
||||
assert offset == 0;
|
||||
HttpURLConnection conn = runner.run();
|
||||
setURL(conn.getURL());
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1139,7 +1219,7 @@ static URL removeOffsetParam(final URL url) throws MalformedURLException {
|
||||
}
|
||||
|
||||
static class OffsetUrlInputStream extends ByteRangeInputStream {
|
||||
OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r)
|
||||
OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
|
||||
throws IOException {
|
||||
super(o, r);
|
||||
}
|
||||
|
@ -13,3 +13,5 @@
|
||||
#
|
||||
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier
|
||||
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
|
||||
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$WebHdfsDelegationTokenIdentifier
|
||||
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$SWebHdfsDelegationTokenIdentifier
|
||||
|
@ -19,47 +19,63 @@
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
||||
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
public class TestWebHdfsTokens {
|
||||
private static Configuration conf;
|
||||
URI uri = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
conf = new Configuration();
|
||||
SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
UserGroupInformation.setLoginUser(
|
||||
UserGroupInformation.createUserForTesting(
|
||||
"LoginUser", new String[]{"supergroup"}));
|
||||
}
|
||||
|
||||
private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException {
|
||||
WebHdfsFileSystem fsOrig = new WebHdfsFileSystem();
|
||||
fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
|
||||
WebHdfsFileSystem fs = spy(fsOrig);
|
||||
Whitebox.setInternalState(fsOrig.tokenAspect, "fs", fs);
|
||||
return fs;
|
||||
}
|
||||
|
||||
@ -89,7 +105,7 @@ public void testNoTokenForGetToken() throws IOException {
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testNoTokenForCanclToken() throws IOException {
|
||||
public void testNoTokenForRenewToken() throws IOException {
|
||||
checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
|
||||
}
|
||||
|
||||
@ -139,4 +155,277 @@ public void testDeleteOpRequireAuth() {
|
||||
assertFalse(op.getRequireAuth());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // for any(Token.class)
|
||||
@Test
|
||||
public void testLazyTokenFetchForWebhdfs() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
WebHdfsFileSystem fs = null;
|
||||
try {
|
||||
final Configuration clusterConf = new HdfsConfiguration(conf);
|
||||
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
|
||||
clusterConf.setBoolean(DFSConfigKeys
|
||||
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
|
||||
// trick the NN into thinking security is enabled w/o it trying
|
||||
// to login from a keytab
|
||||
UserGroupInformation.setConfiguration(clusterConf);
|
||||
cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
|
||||
UserGroupInformation.setConfiguration(clusterConf);
|
||||
|
||||
uri = DFSUtil.createUri(
|
||||
"webhdfs", cluster.getNameNode().getHttpAddress());
|
||||
validateLazyTokenFetch(clusterConf);
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // for any(Token.class)
|
||||
@Test
|
||||
public void testLazyTokenFetchForSWebhdfs() throws Exception {
|
||||
MiniDFSCluster cluster = null;
|
||||
SWebHdfsFileSystem fs = null;
|
||||
try {
|
||||
final Configuration clusterConf = new HdfsConfiguration(conf);
|
||||
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
|
||||
clusterConf.setBoolean(DFSConfigKeys
|
||||
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
|
||||
String BASEDIR = System.getProperty("test.build.dir",
|
||||
"target/test-dir") + "/" + TestWebHdfsTokens.class.getSimpleName();
|
||||
String keystoresDir;
|
||||
String sslConfDir;
|
||||
|
||||
clusterConf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
clusterConf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
clusterConf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
|
||||
File base = new File(BASEDIR);
|
||||
FileUtil.fullyDelete(base);
|
||||
base.mkdirs();
|
||||
keystoresDir = new File(BASEDIR).getAbsolutePath();
|
||||
sslConfDir = KeyStoreTestUtil.getClasspathDir(TestWebHdfsTokens.class);
|
||||
KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, clusterConf, false);
|
||||
|
||||
// trick the NN into thinking security is enabled w/o it trying
|
||||
// to login from a keytab
|
||||
UserGroupInformation.setConfiguration(clusterConf);
|
||||
cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
InetSocketAddress addr = cluster.getNameNode().getHttpsAddress();
|
||||
String nnAddr = NetUtils.getHostPortString(addr);
|
||||
clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr);
|
||||
SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
|
||||
UserGroupInformation.setConfiguration(clusterConf);
|
||||
|
||||
uri = DFSUtil.createUri(
|
||||
"swebhdfs", cluster.getNameNode().getHttpsAddress());
|
||||
validateLazyTokenFetch(clusterConf);
|
||||
} finally {
|
||||
IOUtils.cleanup(null, fs);
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{
|
||||
final String testUser = "DummyUser";
|
||||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
testUser, new String[]{"supergroup"});
|
||||
WebHdfsFileSystem fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
||||
@Override
|
||||
public WebHdfsFileSystem run() throws IOException {
|
||||
return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
|
||||
}
|
||||
});
|
||||
// verify token ops don't get a token
|
||||
Assert.assertNull(fs.getRenewToken());
|
||||
Token<?> token = fs.getDelegationToken(null);
|
||||
fs.renewDelegationToken(token);
|
||||
fs.cancelDelegationToken(token);
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
Assert.assertNull(fs.getRenewToken());
|
||||
reset(fs);
|
||||
|
||||
// verify first non-token op gets a token
|
||||
final Path p = new Path("/f");
|
||||
fs.create(p, (short)1).close();
|
||||
verify(fs, times(1)).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(anyString());
|
||||
verify(fs, times(1)).setDelegationToken(any(Token.class));
|
||||
token = fs.getRenewToken();
|
||||
Assert.assertNotNull(token);
|
||||
Assert.assertEquals(testUser, getTokenOwner(token));
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
reset(fs);
|
||||
|
||||
// verify prior token is reused
|
||||
fs.getFileStatus(p);
|
||||
verify(fs, times(1)).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
Token<?> token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify renew of expired token fails w/o getting a new token
|
||||
token = fs.getRenewToken();
|
||||
fs.cancelDelegationToken(token);
|
||||
try {
|
||||
fs.renewDelegationToken(token);
|
||||
Assert.fail("should have failed");
|
||||
} catch (InvalidToken it) {
|
||||
} catch (Exception ex) {
|
||||
Assert.fail("wrong exception:"+ex);
|
||||
}
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify cancel of expired token fails w/o getting a new token
|
||||
try {
|
||||
fs.cancelDelegationToken(token);
|
||||
Assert.fail("should have failed");
|
||||
} catch (InvalidToken it) {
|
||||
} catch (Exception ex) {
|
||||
Assert.fail("wrong exception:"+ex);
|
||||
}
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify an expired token is replaced with a new token
|
||||
fs.open(p).close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(null);
|
||||
verify(fs, times(1)).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertNotSame(token, token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertEquals(testUser, getTokenOwner(token2));
|
||||
reset(fs);
|
||||
|
||||
// verify with open because it's a little different in how it
|
||||
// opens connections
|
||||
fs.cancelDelegationToken(fs.getRenewToken());
|
||||
InputStream is = fs.open(p);
|
||||
is.read();
|
||||
is.close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(null);
|
||||
verify(fs, times(1)).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertNotSame(token, token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertEquals(testUser, getTokenOwner(token2));
|
||||
reset(fs);
|
||||
|
||||
// verify fs close cancels the token
|
||||
fs.close();
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
verify(fs, times(1)).cancelDelegationToken(eq(token2));
|
||||
|
||||
// add a token to ugi for a new fs, verify it uses that token
|
||||
token = fs.getDelegationToken(null);
|
||||
ugi.addToken(token);
|
||||
fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
||||
@Override
|
||||
public WebHdfsFileSystem run() throws IOException {
|
||||
return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
|
||||
}
|
||||
});
|
||||
Assert.assertNull(fs.getRenewToken());
|
||||
fs.getFileStatus(new Path("/"));
|
||||
verify(fs, times(1)).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, times(1)).setDelegationToken(eq(token));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify it reuses the prior ugi token
|
||||
fs.getFileStatus(new Path("/"));
|
||||
verify(fs, times(1)).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
|
||||
// verify an expired ugi token is NOT replaced with a new token
|
||||
fs.cancelDelegationToken(token);
|
||||
for (int i=0; i<2; i++) {
|
||||
try {
|
||||
fs.getFileStatus(new Path("/"));
|
||||
Assert.fail("didn't fail");
|
||||
} catch (InvalidToken it) {
|
||||
} catch (Exception ex) {
|
||||
Assert.fail("wrong exception:"+ex);
|
||||
}
|
||||
verify(fs, times(1)).getDelegationToken();
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
token2 = fs.getRenewToken();
|
||||
Assert.assertNotNull(token2);
|
||||
Assert.assertEquals(fs.getTokenKind(), token.getKind());
|
||||
Assert.assertSame(token, token2);
|
||||
reset(fs);
|
||||
}
|
||||
|
||||
// verify fs close does NOT cancel the ugi token
|
||||
fs.close();
|
||||
verify(fs, never()).getDelegationToken();
|
||||
verify(fs, never()).replaceExpiredDelegationToken();
|
||||
verify(fs, never()).getDelegationToken(anyString());
|
||||
verify(fs, never()).setDelegationToken(any(Token.class));
|
||||
verify(fs, never()).cancelDelegationToken(any(Token.class));
|
||||
}
|
||||
|
||||
private String getTokenOwner(Token<?> token) throws IOException {
|
||||
// webhdfs doesn't register properly with the class loader
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
Token<?> clone = new Token(token);
|
||||
clone.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
|
||||
return clone.decodeIdentifier().getUser().getUserName();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user