HDFS-9525. hadoop utilities need to support provided delegation tokens (HeeSoo Kim via aw)

This commit is contained in:
Allen Wittenauer 2015-12-08 12:56:38 -08:00
parent 9f50e13d5d
commit 832b3cbde1
10 changed files with 218 additions and 49 deletions

View File

@ -311,6 +311,9 @@ public class CommonConfigurationKeysPublic {
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_SECURITY_DNS_NAMESERVER_KEY =
"hadoop.security.dns.nameserver";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_TOKEN_FILES =
"hadoop.token.files";
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
public static final String HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN =

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_TOKEN_FILES;
import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
import java.io.File;
@ -248,9 +249,9 @@ public boolean logout() throws LoginException {
/**Environment variable pointing to the token cache file*/
public static final String HADOOP_TOKEN_FILE_LOCATION =
"HADOOP_TOKEN_FILE_LOCATION";
/**
"HADOOP_TOKEN_FILE_LOCATION";
/**
* A method to initialize the fields that depend on a configuration.
* Must be called before useKerberos or groups is used.
*/
@ -821,6 +822,26 @@ static void loginUserFromSubject(Subject subject) throws IOException {
}
loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
String tokenFileLocation = System.getProperty(HADOOP_TOKEN_FILES);
if (tokenFileLocation == null) {
tokenFileLocation = conf.get(HADOOP_TOKEN_FILES);
}
if (tokenFileLocation != null) {
String[] tokenFileNames = tokenFileLocation.split("\\s*,\\s*+");
for (String tokenFileName: tokenFileNames) {
if (tokenFileName.length() > 0) {
File tokenFile = new File(tokenFileName);
if (tokenFile.exists() && tokenFile.isFile()) {
Credentials cred = Credentials.readTokenStorageFile(
tokenFile, conf);
loginUser.addCredentials(cred);
} else {
LOG.info("tokenFile("+tokenFileName+") does not exist");
}
}
}
}
String fileLocation = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
// Load the token storage file and put all of the tokens into the

View File

@ -466,6 +466,12 @@ for ldap providers in the same way as above does.
<description>Maps kerberos principals to local user names</description>
</property>
<property>
<name>hadoop.token.files</name>
<value></value>
<description>List of token cache files that have delegation tokens for hadoop service</description>
</property>
<!-- i/o properties -->
<property>
<name>io.file.buffer.size</name>

View File

@ -18,6 +18,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
@ -35,6 +36,7 @@
import javax.security.auth.login.LoginContext;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
@ -861,7 +863,7 @@ public void testPrivateTokenExclusion() throws Exception {
// Ensure only non-private tokens are returned
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
assertEquals(1, tokens.size());
assertEquals(3, tokens.size());
}
/**
@ -928,4 +930,46 @@ public void run() {
}
}
}
@Test (timeout = 30000)
public void testExternalTokenFiles() throws Exception {
StringBuilder tokenFullPathnames = new StringBuilder();
String tokenFilenames = "token1,token2";
String tokenFiles[] = tokenFilenames.split("\\s*,\\s*+");
final File testDir = new File("target",
TestUserGroupInformation.class.getName() + "-tmpDir").getAbsoluteFile();
String testDirPath = testDir.getAbsolutePath();
// create path for token files
for (String tokenFile: tokenFiles) {
if (tokenFullPathnames.length() > 0) {
tokenFullPathnames.append(",");
}
tokenFullPathnames.append(testDirPath).append("/").append(tokenFile);
}
// create new token and store it
TestTokenIdentifier tokenId = new TestTokenIdentifier();
Credentials cred1 = new Credentials();
Token<TestTokenIdentifier> token1 = new Token<TestTokenIdentifier>(
tokenId.getBytes(), "password".getBytes(),
tokenId.getKind(), new Text("token-service1"));
cred1.addToken(token1.getService(), token1);
cred1.writeTokenStorageFile(new Path(testDirPath, tokenFiles[0]), conf);
Credentials cred2 = new Credentials();
Token<TestTokenIdentifier> token2 = new Token<TestTokenIdentifier>(
tokenId.getBytes(), "password".getBytes(),
tokenId.getKind(), new Text("token-service2"));
cred2.addToken(token2.getService(), token2);
cred2.writeTokenStorageFile(new Path(testDirPath, tokenFiles[1]), conf);
// set property for token external token files
System.setProperty("hadoop.token.files", tokenFullPathnames.toString());
UserGroupInformation.setLoginUser(null);
UserGroupInformation tokenUgi = UserGroupInformation.getLoginUser();
Collection<Token<?>> credsugiTokens = tokenUgi.getTokens();
assertTrue(credsugiTokens.contains(token1));
assertTrue(credsugiTokens.contains(token2));
}
}

View File

@ -231,7 +231,7 @@ public URI getCanonicalUri() {
// the first getAuthParams() for a non-token op will either get the
// internal token from the ugi or lazy fetch one
protected synchronized Token<?> getDelegationToken() throws IOException {
if (canRefreshDelegationToken && delegationToken == null) {
if (delegationToken == null) {
Token<?> token = tokenSelector.selectToken(
new Text(getCanonicalServiceName()), ugi.getTokens());
// ugi tokens are usually indicative of a task which can't
@ -241,11 +241,13 @@ protected synchronized Token<?> getDelegationToken() throws IOException {
LOG.debug("Using UGI token: {}", token);
canRefreshDelegationToken = false;
} else {
token = getDelegationToken(null);
if (token != null) {
LOG.debug("Fetched new token: {}", token);
} else { // security is disabled
canRefreshDelegationToken = false;
if (canRefreshDelegationToken) {
token = getDelegationToken(null);
if (token != null) {
LOG.debug("Fetched new token: {}", token);
} else { // security is disabled
canRefreshDelegationToken = false;
}
}
}
setDelegationToken(token);
@ -257,6 +259,7 @@ protected synchronized Token<?> getDelegationToken() throws IOException {
synchronized boolean replaceExpiredDelegationToken() throws IOException {
boolean replaced = false;
if (canRefreshDelegationToken) {
this.delegationToken = null;
Token<?> token = getDelegationToken(null);
LOG.debug("Replaced expired token: {}", token);
setDelegationToken(token);
@ -1346,7 +1349,7 @@ public FileStatus[] listStatus(final Path f) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
return new FsPathResponseRunner<FileStatus[]>(op, f) {
@Override
FileStatus[] decodeResponse(Map<?,?> json) {
FileStatus[] decodeResponse(Map<?, ?> json) {
final Map<?, ?> rootmap =
(Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
final List<?> array = JsonUtilClient.getList(rootmap,
@ -1367,18 +1370,34 @@ FileStatus[] decodeResponse(Map<?,?> json) {
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
public synchronized Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
Token<DelegationTokenIdentifier> token =
new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer)) {
Token<DelegationTokenIdentifier> token = null;
if (delegationToken == null) {
token =
new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer)) {
@Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
throws IOException {
return JsonUtilClient.toDelegationToken(json);
}
}.run();
} else {
token =
new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
op, null, new RenewerParam(renewer),
new DelegationParam(delegationToken.encodeToUrlString())) {
@Override
Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
throws IOException {
return JsonUtilClient.toDelegationToken(json);
}
}.run();
}
if (token != null) {
token.setService(tokenServiceName);
} else {
@ -1406,13 +1425,26 @@ public <T extends TokenIdentifier> void setDelegationToken(
public synchronized long renewDelegationToken(final Token<?> token
) throws IOException {
final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
return new FsPathResponseRunner<Long>(op, null,
new TokenArgumentParam(token.encodeToUrlString())) {
@Override
Long decodeResponse(Map<?,?> json) throws IOException {
return ((Number) json.get("long")).longValue();
}
}.run();
if (delegationToken == null) {
return new FsPathResponseRunner<Long>(op, null,
new TokenArgumentParam(token.encodeToUrlString())) {
@Override
Long decodeResponse(Map<?, ?> json) throws IOException {
return ((Number) json.get("long")).longValue();
}
}.run();
} else {
return new FsPathResponseRunner<Long>(op, null,
new TokenArgumentParam(token.encodeToUrlString()),
new DelegationParam(delegationToken.encodeToUrlString())) {
@Override
Long decodeResponse(Map<?, ?> json) throws IOException {
return ((Number) json.get("long")).longValue();
}
}.run();
}
}
@Override

View File

@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.security.UserGroupInformation;
/** Represents delegation token used for authentication. */
public class DelegationParam extends StringParam {
/** Parameter name. */
@ -33,8 +31,8 @@ public class DelegationParam extends StringParam {
* @param str a string representation of the parameter value.
*/
public DelegationParam(final String str) {
super(DOMAIN, UserGroupInformation.isSecurityEnabled()
&& str != null && !str.equals(DEFAULT)? str: null);
super(DOMAIN, str != null && !str.equals(DEFAULT)? str: null);
}
@Override

View File

@ -55,6 +55,9 @@ Trunk (Unreleased)
HDFS-9057. allow/disallow snapshots via webhdfs
(Bramma Reddy Battula via vinayakumarb)
HDFS-9525. hadoop utilities need to support provided delegation tokens
(HeeSoo Kim via aw)
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.

View File

@ -895,10 +895,6 @@ private Response get(
}
case GETDELEGATIONTOKEN:
{
if (delegation.getValue() != null) {
throw new IllegalArgumentException(delegation.getName()
+ " parameter is not null.");
}
final Token<? extends TokenIdentifier> token = generateDelegationToken(
namenode, ugi, renewer.getValue());

View File

@ -296,7 +296,59 @@ Token<DelegationTokenIdentifier> decodeResponse(Map<?, ?> json)
}
}
}
@Test
public void testReuseToken() throws Exception {
MiniDFSCluster cluster = null;
UserGroupInformation loginUgi = UserGroupInformation.createUserForTesting(
"LoginUser", new String[]{"supergroup"});
try {
final Configuration clusterConf = new HdfsConfiguration(conf);
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
clusterConf.setBoolean(DFSConfigKeys
.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
UserGroupInformation.setConfiguration(clusterConf);
UserGroupInformation.setLoginUser(loginUgi);
cluster = new MiniDFSCluster.Builder(clusterConf).numDataNodes(0).build();
cluster.waitActive();
/* create SIMPLE client connection */
SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
UserGroupInformation.setConfiguration(clusterConf);
UserGroupInformation simpleUgi = UserGroupInformation.createUserForTesting(
"testUser", new String[]{"supergroup"});
final WebHdfsFileSystem simpleFs = WebHdfsTestUtil.getWebHdfsFileSystemAs
(simpleUgi, clusterConf, "webhdfs");
/* create KERBEROS client connection */
SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
UserGroupInformation.setConfiguration(clusterConf);
UserGroupInformation krbUgi = UserGroupInformation.createUserForTesting(
"testUser", new String[]{"supergroup"});
final WebHdfsFileSystem krbFs = WebHdfsTestUtil.getWebHdfsFileSystemAs
(krbUgi, clusterConf, "webhdfs");
// 1. Get initial token through kerberos client connection
Token<DelegationTokenIdentifier> krbToken
= krbFs.getDelegationToken(null);
Assert.assertNotNull(krbToken);
// 2. Get token with previous token which gets from kerberos connection
// through SIMPLE client connection.
simpleFs.setDelegationToken(krbToken);
Token<?> simpleToken = simpleFs.getDelegationToken();
Assert.assertNotNull(simpleToken);
Assert.assertEquals(krbToken.getService(), simpleToken.getService());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@SuppressWarnings("unchecked")
private void validateLazyTokenFetch(final Configuration clusterConf) throws Exception{
final String testUser = "DummyUser";
@ -308,16 +360,6 @@ public WebHdfsFileSystem run() throws IOException {
return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, clusterConf));
}
});
// verify token ops don't get a token
Assert.assertNull(fs.getRenewToken());
Token<?> token = fs.getDelegationToken(null);
fs.renewDelegationToken(token);
fs.cancelDelegationToken(token);
verify(fs, never()).getDelegationToken();
verify(fs, never()).replaceExpiredDelegationToken();
verify(fs, never()).setDelegationToken(any(Token.class));
Assert.assertNull(fs.getRenewToken());
reset(fs);
// verify first non-token op gets a token
final Path p = new Path("/f");
@ -326,8 +368,8 @@ public WebHdfsFileSystem run() throws IOException {
verify(fs, never()).replaceExpiredDelegationToken();
verify(fs, times(1)).getDelegationToken(anyString());
verify(fs, times(1)).setDelegationToken(any(Token.class));
token = fs.getRenewToken();
Assert.assertNotNull(token);
Token<?> token = fs.getRenewToken();
Assert.assertNotNull(token);
Assert.assertEquals(testUser, getTokenOwner(token));
Assert.assertEquals(fs.getTokenKind(), token.getKind());
reset(fs);
@ -421,6 +463,7 @@ public WebHdfsFileSystem run() throws IOException {
verify(fs, times(1)).cancelDelegationToken(eq(token2));
// add a token to ugi for a new fs, verify it uses that token
fs.setDelegationToken(null);
token = fs.getDelegationToken(null);
ugi.addToken(token);
fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {

View File

@ -195,9 +195,33 @@ public void testSecureAuthParamsInUrl() throws IOException {
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getShortUserName()).toString()
new DelegationParam(tokenString).toString()
},
fileStatusUrl);
fileStatusUrl);
// send user with delegationToken
getTokenUrl = webhdfs.toUrl(GetOpParam.Op.GETDELEGATIONTOKEN,
fsPath, new DelegationParam(tokenString));
checkQueryParams(
new String[]{
GetOpParam.Op.GETDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new DelegationParam(tokenString).toString()
},
getTokenUrl);
// send user with delegationToken
renewTokenUrl = webhdfs.toUrl(PutOpParam.Op.RENEWDELEGATIONTOKEN,
fsPath, new TokenArgumentParam(tokenString),
new DelegationParam(tokenString));
checkQueryParams(
new String[]{
PutOpParam.Op.RENEWDELEGATIONTOKEN.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
new TokenArgumentParam(tokenString).toString(),
new DelegationParam(tokenString).toString()
},
renewTokenUrl);
}
@Test(timeout=60000)
@ -274,14 +298,13 @@ public void testSecureProxyAuthParamsInUrl() throws IOException {
new TokenArgumentParam(tokenString).toString()
},
cancelTokenUrl);
// send real+effective
fileStatusUrl = webhdfs.toUrl(GetOpParam.Op.GETFILESTATUS, fsPath);
checkQueryParams(
new String[]{
GetOpParam.Op.GETFILESTATUS.toQueryString(),
new UserParam(ugi.getRealUser().getShortUserName()).toString(),
new DoAsParam(ugi.getShortUserName()).toString()
new DelegationParam(tokenString).toString()
},
fileStatusUrl);
}