HADOOP-7967. Need generalized multi-token filesystem support (daryn)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1374271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ffbadbdd36
commit
8fa10b184e
@ -806,6 +806,8 @@ Release 0.23.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
||||
HADOOP-7967. Need generalized multi-token filesystem support (daryn)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
@ -21,6 +21,7 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
@ -217,6 +218,6 @@ public String getCanonicalServiceName() {
|
||||
|
||||
@Override //AbstractFileSystem
|
||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||
return fsImpl.getDelegationTokens(renewer);
|
||||
return Arrays.asList(fsImpl.addDelegationTokens(renewer, null));
|
||||
}
|
||||
}
|
@ -110,7 +110,11 @@ private boolean renew() throws IOException, InterruptedException {
|
||||
fs.getRenewToken().renew(fs.getConf());
|
||||
} catch (IOException ie) {
|
||||
try {
|
||||
fs.setDelegationToken(fs.getDelegationTokens(null).get(0));
|
||||
Token<?>[] tokens = fs.addDelegationTokens(null, null);
|
||||
if (tokens.length == 0) {
|
||||
throw new IOException("addDelegationTokens returned no tokens");
|
||||
}
|
||||
fs.setDelegationToken(tokens[0]);
|
||||
} catch (IOException ie2) {
|
||||
throw new IOException("Can't renew or get new delegation token ", ie);
|
||||
}
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
@ -57,6 +58,8 @@
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/****************************************************************
|
||||
* An abstract base class for a fairly generic filesystem. It
|
||||
* may be implemented as a distributed filesystem, or as a "local"
|
||||
@ -222,15 +225,25 @@ protected int getDefaultPort() {
|
||||
|
||||
/**
|
||||
* Get a canonical service name for this file system. The token cache is
|
||||
* the only user of this value, and uses it to lookup this filesystem's
|
||||
* service tokens. The token cache will not attempt to acquire tokens if the
|
||||
* service is null.
|
||||
* the only user of the canonical service name, and uses it to lookup this
|
||||
* filesystem's service tokens.
|
||||
* If file system provides a token of its own then it must have a canonical
|
||||
* name, otherwise canonical name can be null.
|
||||
*
|
||||
* Default Impl: If the file system has child file systems
|
||||
* (such as an embedded file system) then it is assumed that the fs has no
|
||||
* tokens of its own and hence returns a null name; otherwise a service
|
||||
* name is built using Uri and port.
|
||||
*
|
||||
* @return a service string that uniquely identifies this file system, null
|
||||
* if the filesystem does not implement tokens
|
||||
* @see SecurityUtil#buildDTServiceName(URI, int)
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
|
||||
public String getCanonicalServiceName() {
|
||||
return SecurityUtil.buildDTServiceName(getUri(), getDefaultPort());
|
||||
return (getChildFileSystems() == null)
|
||||
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
|
||||
: null;
|
||||
}
|
||||
|
||||
/** @deprecated call #getUri() instead.*/
|
||||
@ -396,68 +409,95 @@ public Path makeQualified(Path path) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Deprecated - use @link {@link #getDelegationTokens(String)}
|
||||
* Get a new delegation token for this file system.
|
||||
* This is an internal method that should have been declared protected
|
||||
* but wasn't historically.
|
||||
* Callers should use {@link #addDelegationTokens(String, Credentials)}
|
||||
*
|
||||
* @param renewer the account name that is allowed to renew the token.
|
||||
* @return a new delegation token
|
||||
* @throws IOException
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
|
||||
@Deprecated
|
||||
@InterfaceAudience.Private()
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get one or more delegation tokens associated with the filesystem. Normally
|
||||
* a file system returns a single delegation token. A file system that manages
|
||||
* multiple file systems underneath, could return set of delegation tokens for
|
||||
* all the file systems it manages.
|
||||
* Obtain all delegation tokens used by this FileSystem that are not
|
||||
* already present in the given Credentials. Existing tokens will neither
|
||||
* be verified as valid nor having the given renewer. Missing tokens will
|
||||
* be acquired and added to the given Credentials.
|
||||
*
|
||||
* @param renewer the account name that is allowed to renew the token.
|
||||
* Default Impl: works for simple fs with its own token
|
||||
* and also for an embedded fs whose tokens are those of its
|
||||
* children file system (i.e. the embedded fs has not tokens of its
|
||||
* own).
|
||||
*
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add new delegation tokens
|
||||
* @return list of new delegation tokens
|
||||
* If delegation tokens not supported then return a list of size zero.
|
||||
* @throws IOException
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate( { "HDFS", "MapReduce" })
|
||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||
return new ArrayList<Token<?>>(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #getDelegationTokens(String)
|
||||
* This is similar to getDelegationTokens, with the added restriction that if
|
||||
* a token is already present in the passed Credentials object - that token
|
||||
* is returned instead of a new delegation token.
|
||||
*
|
||||
* If the token is found to be cached in the Credentials object, this API does
|
||||
* not verify the token validity or the passed in renewer.
|
||||
*
|
||||
*
|
||||
* @param renewer the account name that is allowed to renew the token.
|
||||
* @param credentials a Credentials object containing already knowing
|
||||
* delegationTokens.
|
||||
* @return a list of delegation tokens.
|
||||
* @throws IOException
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" })
|
||||
public List<Token<?>> getDelegationTokens(String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
List<Token<?>> allTokens = getDelegationTokens(renewer);
|
||||
List<Token<?>> newTokens = new ArrayList<Token<?>>();
|
||||
if (allTokens != null) {
|
||||
for (Token<?> token : allTokens) {
|
||||
Token<?> knownToken = credentials.getToken(token.getService());
|
||||
if (knownToken == null) {
|
||||
newTokens.add(token);
|
||||
} else {
|
||||
newTokens.add(knownToken);
|
||||
public Token<?>[] addDelegationTokens(
|
||||
final String renewer, Credentials credentials) throws IOException {
|
||||
if (credentials == null) {
|
||||
credentials = new Credentials();
|
||||
}
|
||||
final List<Token<?>> tokens = new ArrayList<Token<?>>();
|
||||
collectDelegationTokens(renewer, credentials, tokens);
|
||||
return tokens.toArray(new Token<?>[tokens.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively obtain the tokens for this FileSystem and all descended
|
||||
* FileSystems as determined by getChildFileSystems().
|
||||
* @param renewer the user allowed to renew the delegation tokens
|
||||
* @param credentials cache in which to add the new delegation tokens
|
||||
* @param tokens list in which to add acquired tokens
|
||||
* @throws IOException
|
||||
*/
|
||||
private void collectDelegationTokens(final String renewer,
|
||||
final Credentials credentials,
|
||||
final List<Token<?>> tokens)
|
||||
throws IOException {
|
||||
final String serviceName = getCanonicalServiceName();
|
||||
// Collect token of the this filesystem and then of its embedded children
|
||||
if (serviceName != null) { // fs has token, grab it
|
||||
final Text service = new Text(serviceName);
|
||||
Token<?> token = credentials.getToken(service);
|
||||
if (token == null) {
|
||||
token = getDelegationToken(renewer);
|
||||
if (token != null) {
|
||||
tokens.add(token);
|
||||
credentials.addToken(service, token);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newTokens;
|
||||
// Now collect the tokens from the children
|
||||
final FileSystem[] children = getChildFileSystems();
|
||||
if (children != null) {
|
||||
for (final FileSystem fs : children) {
|
||||
fs.collectDelegationTokens(renewer, credentials, tokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all the immediate child FileSystems embedded in this FileSystem.
|
||||
* It does not recurse and get grand children. If a FileSystem
|
||||
* has multiple child FileSystems, then it should return a unique list
|
||||
* of those FileSystems. Default is to return null to signify no children.
|
||||
*
|
||||
* @return FileSystems used by this FileSystem
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({ "HDFS" })
|
||||
@VisibleForTesting
|
||||
public FileSystem[] getChildFileSystems() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/** create a file with the provided permission
|
||||
* The permission of the file is set to be the provided permission as in
|
||||
* setPermission, not permission&~umask
|
||||
|
@ -22,15 +22,11 @@
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
/****************************************************************
|
||||
@ -428,25 +424,7 @@ protected boolean primitiveMkdir(Path f, FsPermission abdolutePermission)
|
||||
}
|
||||
|
||||
@Override // FileSystem
|
||||
public String getCanonicalServiceName() {
|
||||
return fs.getCanonicalServiceName();
|
||||
}
|
||||
|
||||
@Override // FileSystem
|
||||
@SuppressWarnings("deprecation")
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return fs.getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
@Override // FileSystem
|
||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||
return fs.getDelegationTokens(renewer);
|
||||
}
|
||||
|
||||
@Override
|
||||
// FileSystem
|
||||
public List<Token<?>> getDelegationTokens(String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
return fs.getDelegationTokens(renewer, credentials);
|
||||
public FileSystem[] getChildFileSystems() {
|
||||
return new FileSystem[]{fs};
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,7 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -49,11 +49,8 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.viewfs.InodeTree.INode;
|
||||
import org.apache.hadoop.fs.viewfs.InodeTree.INodeLink;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
@ -235,11 +232,6 @@ public Path getTrashCanLocation(final Path f) throws FileNotFoundException {
|
||||
return res.isInternalDir() ? null : res.targetFileSystem.getHomeDirectory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return myUri;
|
||||
@ -549,6 +541,18 @@ public void setWriteChecksum(final boolean writeChecksum) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileSystem[] getChildFileSystems() {
|
||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||
fsState.getMountPoints();
|
||||
Set<FileSystem> children = new HashSet<FileSystem>();
|
||||
for (InodeTree.MountPoint<FileSystem> mountPoint : mountPoints) {
|
||||
FileSystem targetFs = mountPoint.target.targetFileSystem;
|
||||
children.addAll(Arrays.asList(targetFs.getChildFileSystems()));
|
||||
}
|
||||
return children.toArray(new FileSystem[]{});
|
||||
}
|
||||
|
||||
public MountPoint[] getMountPoints() {
|
||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||
fsState.getMountPoints();
|
||||
@ -561,59 +565,6 @@ public MountPoint[] getMountPoints() {
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||
fsState.getMountPoints();
|
||||
int initialListSize = 0;
|
||||
for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
|
||||
initialListSize += im.target.targetDirLinkList.length;
|
||||
}
|
||||
List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
|
||||
for ( int i = 0; i < mountPoints.size(); ++i ) {
|
||||
List<Token<?>> tokens =
|
||||
mountPoints.get(i).target.targetFileSystem.getDelegationTokens(renewer);
|
||||
if (tokens != null) {
|
||||
result.addAll(tokens);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Token<?>> getDelegationTokens(String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
List<InodeTree.MountPoint<FileSystem>> mountPoints =
|
||||
fsState.getMountPoints();
|
||||
int initialListSize = 0;
|
||||
for (InodeTree.MountPoint<FileSystem> im : mountPoints) {
|
||||
initialListSize += im.target.targetDirLinkList.length;
|
||||
}
|
||||
Set<String> seenServiceNames = new HashSet<String>();
|
||||
List<Token<?>> result = new ArrayList<Token<?>>(initialListSize);
|
||||
for (int i = 0; i < mountPoints.size(); ++i) {
|
||||
String serviceName =
|
||||
mountPoints.get(i).target.targetFileSystem.getCanonicalServiceName();
|
||||
if (serviceName == null || seenServiceNames.contains(serviceName)) {
|
||||
continue;
|
||||
}
|
||||
seenServiceNames.add(serviceName);
|
||||
Token<?> knownToken = credentials.getToken(new Text(serviceName));
|
||||
if (knownToken != null) {
|
||||
result.add(knownToken);
|
||||
} else {
|
||||
List<Token<?>> tokens =
|
||||
mountPoints.get(i).target.targetFileSystem
|
||||
.getDelegationTokens(renewer);
|
||||
if (tokens != null) {
|
||||
result.addAll(tokens);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* An instance of this class represents an internal dir of the viewFs
|
||||
* that is internal dir of the mount table.
|
||||
|
@ -56,6 +56,20 @@ public class Credentials implements Writable {
|
||||
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
|
||||
new HashMap<Text, Token<? extends TokenIdentifier>>();
|
||||
|
||||
/**
|
||||
* Create an empty credentials instance
|
||||
*/
|
||||
public Credentials() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a copy of the given credentials
|
||||
* @param credentials to copy
|
||||
*/
|
||||
public Credentials(Credentials credentials) {
|
||||
this.addAll(credentials);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the key bytes for the alias
|
||||
* @param alias the alias for the key
|
||||
|
@ -24,8 +24,10 @@
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
/**
|
||||
* Helper class for unit tests.
|
||||
@ -218,4 +220,39 @@ public static void checkFileStatus(FileSystem aFs, String path,
|
||||
}
|
||||
Assert.assertEquals(aFs.makeQualified(new Path(path)), s.getPath());
|
||||
}
|
||||
|
||||
/**
|
||||
* Class to enable easier mocking of a FileSystem
|
||||
* Use getRawFileSystem to retrieve the mock
|
||||
*/
|
||||
public static class MockFileSystem extends FilterFileSystem {
|
||||
public MockFileSystem() {
|
||||
// it's a bit ackward to mock ourselves, but it allows the visibility
|
||||
// of methods to be increased
|
||||
super(mock(MockFileSystem.class));
|
||||
}
|
||||
@Override
|
||||
public MockFileSystem getRawFileSystem() {
|
||||
return (MockFileSystem) super.getRawFileSystem();
|
||||
|
||||
}
|
||||
// these basic methods need to directly propagate to the mock to be
|
||||
// more transparent
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration conf) throws IOException {
|
||||
fs.initialize(uri, conf);
|
||||
}
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return fs.getCanonicalServiceName();
|
||||
}
|
||||
@Override
|
||||
public FileSystem[] getChildFileSystems() {
|
||||
return fs.getChildFileSystems();
|
||||
}
|
||||
@Override // publicly expose for mocking
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
return fs.getDelegationToken(renewer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,279 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestFileSystemTokens {
|
||||
private static String renewer = "renewer!";
|
||||
|
||||
@Test
|
||||
public void testFsWithNoToken() throws Exception {
|
||||
MockFileSystem fs = createFileSystemForServiceName(null);
|
||||
Credentials credentials = new Credentials();
|
||||
|
||||
fs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(fs, false);
|
||||
assertEquals(0, credentials.numberOfTokens());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithToken() throws Exception {
|
||||
Text service = new Text("singleTokenFs");
|
||||
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||
Credentials credentials = new Credentials();
|
||||
|
||||
fs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(fs, true);
|
||||
|
||||
assertEquals(1, credentials.numberOfTokens());
|
||||
assertNotNull(credentials.getToken(service));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithTokenExists() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service = new Text("singleTokenFs");
|
||||
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||
Token<?> token = mock(Token.class);
|
||||
credentials.addToken(service, token);
|
||||
|
||||
fs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(fs, false);
|
||||
|
||||
assertEquals(1, credentials.numberOfTokens());
|
||||
assertSame(token, credentials.getToken(service));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithChildTokens() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service1 = new Text("singleTokenFs1");
|
||||
Text service2 = new Text("singleTokenFs2");
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||
MockFileSystem multiFs =
|
||||
createFileSystemForServiceName(null, fs1, fs2, fs3);
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, false); // has no tokens of own, only child tokens
|
||||
verifyTokenFetch(fs1, true);
|
||||
verifyTokenFetch(fs2, true);
|
||||
verifyTokenFetch(fs3, false);
|
||||
|
||||
assertEquals(2, credentials.numberOfTokens());
|
||||
assertNotNull(credentials.getToken(service1));
|
||||
assertNotNull(credentials.getToken(service2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithDuplicateChildren() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service = new Text("singleTokenFs1");
|
||||
|
||||
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||
MockFileSystem multiFs =
|
||||
createFileSystemForServiceName(null, fs, new FilterFileSystem(fs));
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, false);
|
||||
verifyTokenFetch(fs, true);
|
||||
|
||||
assertEquals(1, credentials.numberOfTokens());
|
||||
assertNotNull(credentials.getToken(service));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithDuplicateChildrenTokenExists() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service = new Text("singleTokenFs1");
|
||||
Token<?> token = mock(Token.class);
|
||||
credentials.addToken(service, token);
|
||||
|
||||
MockFileSystem fs = createFileSystemForServiceName(service);
|
||||
MockFileSystem multiFs =
|
||||
createFileSystemForServiceName(null, fs, new FilterFileSystem(fs));
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, false);
|
||||
verifyTokenFetch(fs, false);
|
||||
|
||||
assertEquals(1, credentials.numberOfTokens());
|
||||
assertSame(token, credentials.getToken(service));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithChildTokensOneExists() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service1 = new Text("singleTokenFs1");
|
||||
Text service2 = new Text("singleTokenFs2");
|
||||
Token<?> token = mock(Token.class);
|
||||
credentials.addToken(service2, token);
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||
MockFileSystem multiFs = createFileSystemForServiceName(null, fs1, fs2, fs3);
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, false);
|
||||
verifyTokenFetch(fs1, true);
|
||||
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||
verifyTokenFetch(fs3, false);
|
||||
|
||||
assertEquals(2, credentials.numberOfTokens());
|
||||
assertNotNull(credentials.getToken(service1));
|
||||
assertSame(token, credentials.getToken(service2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithMyOwnAndChildTokens() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service1 = new Text("singleTokenFs1");
|
||||
Text service2 = new Text("singleTokenFs2");
|
||||
Text myService = new Text("multiTokenFs");
|
||||
Token<?> token = mock(Token.class);
|
||||
credentials.addToken(service2, token);
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||
MockFileSystem multiFs = createFileSystemForServiceName(myService, fs1, fs2);
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, true); // its own token and also of its children
|
||||
verifyTokenFetch(fs1, true);
|
||||
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||
|
||||
assertEquals(3, credentials.numberOfTokens());
|
||||
assertNotNull(credentials.getToken(myService));
|
||||
assertNotNull(credentials.getToken(service1));
|
||||
assertNotNull(credentials.getToken(service2));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testFsWithMyOwnExistsAndChildTokens() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service1 = new Text("singleTokenFs1");
|
||||
Text service2 = new Text("singleTokenFs2");
|
||||
Text myService = new Text("multiTokenFs");
|
||||
Token<?> token = mock(Token.class);
|
||||
credentials.addToken(myService, token);
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||
MockFileSystem multiFs = createFileSystemForServiceName(myService, fs1, fs2);
|
||||
|
||||
multiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(multiFs, false); // we had added its token to credentials
|
||||
verifyTokenFetch(fs1, true);
|
||||
verifyTokenFetch(fs2, true);
|
||||
|
||||
assertEquals(3, credentials.numberOfTokens());
|
||||
assertSame(token, credentials.getToken(myService));
|
||||
assertNotNull(credentials.getToken(service1));
|
||||
assertNotNull(credentials.getToken(service2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFsWithNestedDuplicatesChildren() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Text service1 = new Text("singleTokenFs1");
|
||||
Text service2 = new Text("singleTokenFs2");
|
||||
Text service4 = new Text("singleTokenFs4");
|
||||
Text multiService = new Text("multiTokenFs");
|
||||
Token<?> token2 = mock(Token.class);
|
||||
credentials.addToken(service2, token2);
|
||||
|
||||
MockFileSystem fs1 = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs1B = createFileSystemForServiceName(service1);
|
||||
MockFileSystem fs2 = createFileSystemForServiceName(service2);
|
||||
MockFileSystem fs3 = createFileSystemForServiceName(null);
|
||||
MockFileSystem fs4 = createFileSystemForServiceName(service4);
|
||||
// now let's get dirty! ensure dup tokens aren't fetched even when
|
||||
// repeated and dupped in a nested fs. fs4 is a real test of the drill
|
||||
// down: multi-filter-multi-filter-filter-fs4.
|
||||
MockFileSystem multiFs = createFileSystemForServiceName(multiService,
|
||||
fs1, fs1B, fs2, fs2, new FilterFileSystem(fs3),
|
||||
new FilterFileSystem(new FilterFileSystem(fs4)));
|
||||
MockFileSystem superMultiFs = createFileSystemForServiceName(null,
|
||||
fs1, fs1B, fs1, new FilterFileSystem(fs3), new FilterFileSystem(multiFs));
|
||||
superMultiFs.addDelegationTokens(renewer, credentials);
|
||||
verifyTokenFetch(superMultiFs, false); // does not have its own token
|
||||
verifyTokenFetch(multiFs, true); // has its own token
|
||||
verifyTokenFetch(fs1, true);
|
||||
verifyTokenFetch(fs2, false); // we had added its token to credentials
|
||||
verifyTokenFetch(fs3, false); // has no tokens
|
||||
verifyTokenFetch(fs4, true);
|
||||
|
||||
assertEquals(4, credentials.numberOfTokens()); //fs1+fs2+fs4+multifs (fs3=0)
|
||||
assertNotNull(credentials.getToken(service1));
|
||||
assertNotNull(credentials.getToken(service2));
|
||||
assertSame(token2, credentials.getToken(service2));
|
||||
assertNotNull(credentials.getToken(multiService));
|
||||
assertNotNull(credentials.getToken(service4));
|
||||
}
|
||||
|
||||
public static MockFileSystem createFileSystemForServiceName(
|
||||
final Text service, final FileSystem... children) throws IOException {
|
||||
final MockFileSystem fs = new MockFileSystem();
|
||||
final MockFileSystem mockFs = fs.getRawFileSystem();
|
||||
if (service != null) {
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn(service.toString());
|
||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||
new Answer<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
||||
Token<?> token = new Token<TokenIdentifier>();
|
||||
token.setService(service);
|
||||
return token;
|
||||
}
|
||||
});
|
||||
}
|
||||
when(mockFs.getChildFileSystems()).thenReturn(children);
|
||||
return fs;
|
||||
}
|
||||
|
||||
// check that canonical name was requested, if renewer is not null that
|
||||
// a token was requested, and that child fs was invoked
|
||||
private void verifyTokenFetch(MockFileSystem fs, boolean expected) throws IOException {
|
||||
verify(fs.getRawFileSystem(), atLeast(1)).getCanonicalServiceName();
|
||||
if (expected) {
|
||||
verify(fs.getRawFileSystem()).getDelegationToken(renewer);
|
||||
} else {
|
||||
verify(fs.getRawFileSystem(), never()).getDelegationToken(any(String.class));
|
||||
}
|
||||
verify(fs.getRawFileSystem(), atLeast(1)).getChildFileSystems();
|
||||
}
|
||||
}
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Options.Rename;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.junit.BeforeClass;
|
||||
@ -185,6 +186,10 @@ public boolean deleteOnExit(Path f) throws IOException {
|
||||
public boolean cancelDeleteOnExit(Path f) throws IOException {
|
||||
return false;
|
||||
}
|
||||
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
public String getScheme() {
|
||||
return "dontcheck";
|
||||
}
|
||||
|
@ -22,10 +22,18 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsConstants;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
@ -38,6 +46,29 @@
|
||||
public class TestViewFileSystemDelegationTokenSupport {
|
||||
|
||||
private static final String MOUNT_TABLE_NAME = "vfs-cluster";
|
||||
static Configuration conf;
|
||||
static FileSystem viewFs;
|
||||
static FakeFileSystem fs1;
|
||||
static FakeFileSystem fs2;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf = ViewFileSystemTestSetup.createConfig();
|
||||
fs1 = setupFileSystem(new URI("fs1:///"), FakeFileSystem.class);
|
||||
fs2 = setupFileSystem(new URI("fs2:///"), FakeFileSystem.class);
|
||||
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
|
||||
}
|
||||
|
||||
static FakeFileSystem setupFileSystem(URI uri, Class<? extends FileSystem> clazz)
|
||||
throws Exception {
|
||||
String scheme = uri.getScheme();
|
||||
conf.set("fs."+scheme+".impl", clazz.getName());
|
||||
FakeFileSystem fs = (FakeFileSystem)FileSystem.get(uri, conf);
|
||||
// mount each fs twice, will later ensure 1 token/fs
|
||||
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-one", fs.getUri());
|
||||
ConfigUtil.addLink(conf, "/mounts/"+scheme+"-two", fs.getUri());
|
||||
return fs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Regression test for HADOOP-8408.
|
||||
@ -69,4 +100,92 @@ public void testGetCanonicalServiceNameWithDefaultMountTable()
|
||||
assertNull(serviceName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetChildFileSystems() throws Exception {
|
||||
assertNull(fs1.getChildFileSystems());
|
||||
assertNull(fs2.getChildFileSystems());
|
||||
List<FileSystem> children = Arrays.asList(viewFs.getChildFileSystems());
|
||||
assertEquals(2, children.size());
|
||||
assertTrue(children.contains(fs1));
|
||||
assertTrue(children.contains(fs2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddDelegationTokens() throws Exception {
|
||||
Credentials creds = new Credentials();
|
||||
Token<?> fs1Tokens[] = addTokensWithCreds(fs1, creds);
|
||||
assertEquals(1, fs1Tokens.length);
|
||||
assertEquals(1, creds.numberOfTokens());
|
||||
Token<?> fs2Tokens[] = addTokensWithCreds(fs2, creds);
|
||||
assertEquals(1, fs2Tokens.length);
|
||||
assertEquals(2, creds.numberOfTokens());
|
||||
|
||||
Credentials savedCreds = creds;
|
||||
creds = new Credentials();
|
||||
|
||||
// should get the same set of tokens as explicitly fetched above
|
||||
Token<?> viewFsTokens[] = viewFs.addDelegationTokens("me", creds);
|
||||
assertEquals(2, viewFsTokens.length);
|
||||
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||
// should get none, already have all tokens
|
||||
viewFsTokens = viewFs.addDelegationTokens("me", creds);
|
||||
assertEquals(0, viewFsTokens.length);
|
||||
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||
}
|
||||
|
||||
Token<?>[] addTokensWithCreds(FileSystem fs, Credentials creds) throws Exception {
|
||||
Credentials savedCreds;
|
||||
|
||||
savedCreds = new Credentials(creds);
|
||||
Token<?> tokens[] = fs.addDelegationTokens("me", creds);
|
||||
// test that we got the token we wanted, and that creds were modified
|
||||
assertEquals(1, tokens.length);
|
||||
assertEquals(fs.getCanonicalServiceName(), tokens[0].getService().toString());
|
||||
assertTrue(creds.getAllTokens().contains(tokens[0]));
|
||||
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||
assertEquals(savedCreds.numberOfTokens()+1, creds.numberOfTokens());
|
||||
|
||||
// shouldn't get any new tokens since already in creds
|
||||
savedCreds = new Credentials(creds);
|
||||
Token<?> tokenRefetch[] = fs.addDelegationTokens("me", creds);
|
||||
assertEquals(0, tokenRefetch.length);
|
||||
assertTrue(creds.getAllTokens().containsAll(savedCreds.getAllTokens()));
|
||||
assertEquals(savedCreds.numberOfTokens(), creds.numberOfTokens());
|
||||
|
||||
return tokens;
|
||||
}
|
||||
|
||||
static class FakeFileSystem extends RawLocalFileSystem {
|
||||
URI uri;
|
||||
|
||||
public void initialize(URI name, Configuration conf) throws IOException {
|
||||
this.uri = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getInitialWorkingDirectory() {
|
||||
return new Path("/"); // ctor calls getUri before the uri is inited...
|
||||
}
|
||||
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getCanonicalServiceName() {
|
||||
return String.valueOf(this.getUri()+"/"+this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getDelegationToken(String renewer) throws IOException {
|
||||
Token<?> token = new Token<TokenIdentifier>();
|
||||
token.setService(new Text(getCanonicalServiceName()));
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@ -137,9 +138,9 @@ int getExpectedMountPoints() {
|
||||
*/
|
||||
@Test
|
||||
public void testGetDelegationTokens() throws IOException {
|
||||
List<Token<?>> delTokens =
|
||||
fsView.getDelegationTokens("sanjay");
|
||||
Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.size());
|
||||
Token<?>[] delTokens =
|
||||
fsView.addDelegationTokens("sanjay", new Credentials());
|
||||
Assert.assertEquals(getExpectedDelegationTokenCount(), delTokens.length);
|
||||
}
|
||||
|
||||
int getExpectedDelegationTokenCount() {
|
||||
@ -150,29 +151,20 @@ int getExpectedDelegationTokenCount() {
|
||||
public void testGetDelegationTokensWithCredentials() throws IOException {
|
||||
Credentials credentials = new Credentials();
|
||||
List<Token<?>> delTokens =
|
||||
fsView.getDelegationTokens("sanjay", credentials);
|
||||
Arrays.asList(fsView.addDelegationTokens("sanjay", credentials));
|
||||
|
||||
int expectedTokenCount = getExpectedDelegationTokenCountWithCredentials();
|
||||
|
||||
Assert.assertEquals(expectedTokenCount, delTokens.size());
|
||||
Credentials newCredentials = new Credentials();
|
||||
for (int i = 0; i < expectedTokenCount / 2; i++) {
|
||||
Token<?> token = delTokens.get(i);
|
||||
credentials.addToken(token.getService(), token);
|
||||
newCredentials.addToken(token.getService(), token);
|
||||
}
|
||||
|
||||
List<Token<?>> delTokens2 =
|
||||
fsView.getDelegationTokens("sanjay", credentials);
|
||||
Assert.assertEquals(expectedTokenCount, delTokens2.size());
|
||||
|
||||
for (int i = 0; i < delTokens2.size(); i++) {
|
||||
for (int j = 0; j < delTokens.size(); j++) {
|
||||
if (delTokens.get(j) == delTokens2.get(i)) {
|
||||
delTokens.remove(j);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert.assertEquals((expectedTokenCount + 1) / 2, delTokens.size());
|
||||
Arrays.asList(fsView.addDelegationTokens("sanjay", newCredentials));
|
||||
Assert.assertEquals((expectedTokenCount + 1) / 2, delTokens2.size());
|
||||
}
|
||||
|
||||
int getExpectedDelegationTokenCountWithCredentials() {
|
||||
|
@ -863,7 +863,6 @@ public void readFields(DataInput in) throws IOException {
|
||||
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public Token<?> getDelegationToken(final String renewer)
|
||||
throws IOException {
|
||||
return doAsRealUserIfNecessary(new Callable<Token<?>>() {
|
||||
@ -875,19 +874,6 @@ public Token<?> call() throws Exception {
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Token<?>> getDelegationTokens(final String renewer)
|
||||
throws IOException {
|
||||
return doAsRealUserIfNecessary(new Callable<List<Token<?>>>() {
|
||||
@Override
|
||||
public List<Token<?>> call() throws Exception {
|
||||
return HttpFSKerberosAuthenticator.
|
||||
getDelegationTokens(uri, httpFSAddr, authToken, renewer);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||
return doAsRealUserIfNecessary(new Callable<Long>() {
|
||||
@Override
|
||||
|
@ -66,7 +66,6 @@ protected Authenticator getFallBackAuthenticator() {
|
||||
public static final String RENEWER_PARAM = "renewer";
|
||||
public static final String TOKEN_KIND = "HTTPFS_DELEGATION_TOKEN";
|
||||
public static final String DELEGATION_TOKEN_JSON = "Token";
|
||||
public static final String DELEGATION_TOKENS_JSON = "Tokens";
|
||||
public static final String DELEGATION_TOKEN_URL_STRING_JSON = "urlString";
|
||||
public static final String RENEW_DELEGATION_TOKEN_JSON = "long";
|
||||
|
||||
@ -76,7 +75,6 @@ protected Authenticator getFallBackAuthenticator() {
|
||||
@InterfaceAudience.Private
|
||||
public static enum DelegationTokenOperation {
|
||||
GETDELEGATIONTOKEN(HTTP_GET, true),
|
||||
GETDELEGATIONTOKENS(HTTP_GET, true),
|
||||
RENEWDELEGATIONTOKEN(HTTP_PUT, true),
|
||||
CANCELDELEGATIONTOKEN(HTTP_PUT, false);
|
||||
|
||||
@ -121,10 +119,11 @@ public void authenticate(URL url, AuthenticatedURL.Token token)
|
||||
|
||||
public static final String OP_PARAM = "op";
|
||||
|
||||
private static List<Token<?>> getDelegationTokens(URI fsURI,
|
||||
InetSocketAddress httpFSAddr, DelegationTokenOperation op,
|
||||
AuthenticatedURL.Token token, String renewer)
|
||||
throws IOException {
|
||||
public static Token<?> getDelegationToken(URI fsURI,
|
||||
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
||||
String renewer) throws IOException {
|
||||
DelegationTokenOperation op =
|
||||
DelegationTokenOperation.GETDELEGATIONTOKEN;
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
params.put(OP_PARAM, op.toString());
|
||||
params.put(RENEWER_PARAM,renewer);
|
||||
@ -135,56 +134,20 @@ private static List<Token<?>> getDelegationTokens(URI fsURI,
|
||||
HttpURLConnection conn = aUrl.openConnection(url, token);
|
||||
conn.setRequestMethod(op.getHttpMethod());
|
||||
HttpFSUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||
List<String> list = new ArrayList<String>();
|
||||
if (op == DelegationTokenOperation.GETDELEGATIONTOKEN) {
|
||||
JSONObject json = (JSONObject) ((JSONObject)
|
||||
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKEN_JSON);
|
||||
String tokenStr = (String)
|
||||
json.get(DELEGATION_TOKEN_URL_STRING_JSON);
|
||||
list.add(tokenStr);
|
||||
}
|
||||
else if (op == DelegationTokenOperation.GETDELEGATIONTOKENS) {
|
||||
JSONObject json = (JSONObject) ((JSONObject)
|
||||
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKENS_JSON);
|
||||
JSONArray array = (JSONArray) json.get(DELEGATION_TOKEN_JSON);
|
||||
for (Object element : array) {
|
||||
String tokenStr = (String)
|
||||
((Map) element).get(DELEGATION_TOKEN_URL_STRING_JSON);
|
||||
list.add(tokenStr);
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new IllegalArgumentException("Invalid operation: " +
|
||||
op.toString());
|
||||
}
|
||||
List<Token<?>> dTokens = new ArrayList<Token<?>>();
|
||||
for (String tokenStr : list) {
|
||||
Token<AbstractDelegationTokenIdentifier> dToken =
|
||||
new Token<AbstractDelegationTokenIdentifier>();
|
||||
dToken.decodeFromUrlString(tokenStr);
|
||||
dTokens.add(dToken);
|
||||
SecurityUtil.setTokenService(dToken, httpFSAddr);
|
||||
}
|
||||
return dTokens;
|
||||
JSONObject json = (JSONObject) ((JSONObject)
|
||||
HttpFSUtils.jsonParse(conn)).get(DELEGATION_TOKEN_JSON);
|
||||
String tokenStr = (String)
|
||||
json.get(DELEGATION_TOKEN_URL_STRING_JSON);
|
||||
Token<AbstractDelegationTokenIdentifier> dToken =
|
||||
new Token<AbstractDelegationTokenIdentifier>();
|
||||
dToken.decodeFromUrlString(tokenStr);
|
||||
SecurityUtil.setTokenService(dToken, httpFSAddr);
|
||||
return dToken;
|
||||
} catch (AuthenticationException ex) {
|
||||
throw new IOException(ex.toString(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Token<?>> getDelegationTokens(URI fsURI,
|
||||
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
||||
String renewer) throws IOException {
|
||||
return getDelegationTokens(fsURI, httpFSAddr,
|
||||
DelegationTokenOperation.GETDELEGATIONTOKENS, token, renewer);
|
||||
}
|
||||
|
||||
public static Token<?> getDelegationToken(URI fsURI,
|
||||
InetSocketAddress httpFSAddr, AuthenticatedURL.Token token,
|
||||
String renewer) throws IOException {
|
||||
return getDelegationTokens(fsURI, httpFSAddr,
|
||||
DelegationTokenOperation.GETDELEGATIONTOKENS, token, renewer).get(0);
|
||||
}
|
||||
|
||||
public static long renewDelegationToken(URI fsURI,
|
||||
AuthenticatedURL.Token token, Token<?> dToken) throws IOException {
|
||||
Map<String, String> params = new HashMap<String, String>();
|
||||
|
@ -63,8 +63,6 @@ public class HttpFSKerberosAuthenticationHandler
|
||||
static {
|
||||
DELEGATION_TOKEN_OPS.add(
|
||||
DelegationTokenOperation.GETDELEGATIONTOKEN.toString());
|
||||
DELEGATION_TOKEN_OPS.add(
|
||||
DelegationTokenOperation.GETDELEGATIONTOKENS.toString());
|
||||
DELEGATION_TOKEN_OPS.add(
|
||||
DelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
|
||||
DELEGATION_TOKEN_OPS.add(
|
||||
@ -111,7 +109,6 @@ public boolean managementOperation(AuthenticationToken token,
|
||||
Map map = null;
|
||||
switch (dtOp) {
|
||||
case GETDELEGATIONTOKEN:
|
||||
case GETDELEGATIONTOKENS:
|
||||
String renewerParam =
|
||||
request.getParameter(HttpFSKerberosAuthenticator.RENEWER_PARAM);
|
||||
if (renewerParam == null) {
|
||||
@ -119,11 +116,7 @@ public boolean managementOperation(AuthenticationToken token,
|
||||
}
|
||||
Token<?> dToken = tokenManager.createToken(
|
||||
UserGroupInformation.getCurrentUser(), renewerParam);
|
||||
if (dtOp == DelegationTokenOperation.GETDELEGATIONTOKEN) {
|
||||
map = delegationTokenToJSON(dToken);
|
||||
} else {
|
||||
map = delegationTokensToJSON(Arrays.asList((Token)dToken));
|
||||
}
|
||||
map = delegationTokenToJSON(dToken);
|
||||
break;
|
||||
case RENEWDELEGATIONTOKEN:
|
||||
case CANCELDELEGATIONTOKEN:
|
||||
@ -191,23 +184,6 @@ private static Map delegationTokenToJSON(Token token) throws IOException {
|
||||
return response;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map delegationTokensToJSON(List<Token> tokens)
|
||||
throws IOException {
|
||||
List list = new ArrayList();
|
||||
for (Token token : tokens) {
|
||||
Map map = new HashMap();
|
||||
map.put(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON,
|
||||
token.encodeToUrlString());
|
||||
list.add(map);
|
||||
}
|
||||
Map map = new HashMap();
|
||||
map.put(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON, list);
|
||||
Map response = new LinkedHashMap();
|
||||
response.put(HttpFSKerberosAuthenticator.DELEGATION_TOKENS_JSON, map);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticates a request looking for the <code>delegation</code>
|
||||
* query-string parameter and verifying it is a valid token. If there is not
|
||||
|
@ -68,10 +68,8 @@ public void testManagementOperations() throws Exception {
|
||||
|
||||
testNonManagementOperation(handler);
|
||||
testManagementOperationErrors(handler);
|
||||
testGetToken(handler, false, null);
|
||||
testGetToken(handler, true, null);
|
||||
testGetToken(handler, false, "foo");
|
||||
testGetToken(handler, true, "foo");
|
||||
testGetToken(handler, null);
|
||||
testGetToken(handler, "foo");
|
||||
testCancelToken(handler);
|
||||
testRenewToken(handler);
|
||||
|
||||
@ -115,12 +113,9 @@ private void testManagementOperationErrors(AuthenticationHandler handler)
|
||||
Mockito.contains("requires SPNEGO"));
|
||||
}
|
||||
|
||||
private void testGetToken(AuthenticationHandler handler, boolean tokens,
|
||||
String renewer)
|
||||
private void testGetToken(AuthenticationHandler handler, String renewer)
|
||||
throws Exception {
|
||||
DelegationTokenOperation op =
|
||||
(tokens) ? DelegationTokenOperation.GETDELEGATIONTOKENS
|
||||
: DelegationTokenOperation.GETDELEGATIONTOKEN;
|
||||
DelegationTokenOperation op = DelegationTokenOperation.GETDELEGATIONTOKEN;
|
||||
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
|
||||
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
|
||||
Mockito.when(request.getParameter(HttpFSFileSystem.OP_PARAM)).
|
||||
@ -148,23 +143,13 @@ private void testGetToken(AuthenticationHandler handler, boolean tokens,
|
||||
Mockito.verify(response).setContentType(MediaType.APPLICATION_JSON);
|
||||
pwriter.close();
|
||||
String responseOutput = writer.toString();
|
||||
String tokenLabel = (tokens)
|
||||
? HttpFSKerberosAuthenticator.DELEGATION_TOKENS_JSON
|
||||
: HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON;
|
||||
if (tokens) {
|
||||
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
||||
} else {
|
||||
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
||||
}
|
||||
String tokenLabel = HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON;
|
||||
Assert.assertTrue(responseOutput.contains(tokenLabel));
|
||||
Assert.assertTrue(responseOutput.contains(
|
||||
HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON));
|
||||
JSONObject json = (JSONObject) new JSONParser().parse(responseOutput);
|
||||
json = (JSONObject) json.get(tokenLabel);
|
||||
String tokenStr;
|
||||
if (tokens) {
|
||||
json = (JSONObject) ((JSONArray)
|
||||
json.get(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_JSON)).get(0);
|
||||
}
|
||||
tokenStr = (String)
|
||||
json.get(HttpFSKerberosAuthenticator.DELEGATION_TOKEN_URL_STRING_JSON);
|
||||
Token<DelegationTokenIdentifier> dt = new Token<DelegationTokenIdentifier>();
|
||||
|
@ -222,10 +222,11 @@ private void testDelegationTokenWithFS(Class fileSystemClass)
|
||||
URI uri = new URI( "webhdfs://" +
|
||||
TestJettyHelper.getJettyURL().toURI().getAuthority());
|
||||
FileSystem fs = FileSystem.get(uri, conf);
|
||||
Token<?> token = fs.getDelegationToken("foo");
|
||||
Token<?> tokens[] = fs.addDelegationTokens("foo", null);
|
||||
fs.close();
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
fs = FileSystem.get(uri, conf);
|
||||
((DelegationTokenRenewer.Renewable) fs).setDelegationToken(token);
|
||||
((DelegationTokenRenewer.Renewable) fs).setDelegationToken(tokens[0]);
|
||||
fs.listStatus(new Path("/"));
|
||||
fs.close();
|
||||
}
|
||||
|
@ -24,7 +24,6 @@
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -765,14 +764,6 @@ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
|
||||
return getDelegationToken(renewer.toString());
|
||||
}
|
||||
|
||||
@Override // FileSystem
|
||||
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
|
||||
List<Token<?>> tokenList = new ArrayList<Token<?>>();
|
||||
Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
|
||||
tokenList.add(token);
|
||||
return tokenList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Renew an existing delegation token.
|
||||
*
|
||||
|
@ -669,17 +669,6 @@ private Response get(
|
||||
final String js = JsonUtil.toJsonString(token);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETDELEGATIONTOKENS:
|
||||
{
|
||||
if (delegation.getValue() != null) {
|
||||
throw new IllegalArgumentException(delegation.getName()
|
||||
+ " parameter is not null.");
|
||||
}
|
||||
final Token<? extends TokenIdentifier>[] tokens = new Token<?>[1];
|
||||
tokens[0] = generateDelegationToken(namenode, ugi, renewer.getValue());
|
||||
final String js = JsonUtil.toJsonString(tokens);
|
||||
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
|
||||
}
|
||||
case GETHOMEDIRECTORY:
|
||||
{
|
||||
final String js = JsonUtil.toJsonString(
|
||||
|
@ -189,13 +189,14 @@ public Object run() throws Exception {
|
||||
}
|
||||
} else {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Token<?> token = fs.getDelegationToken(renewer);
|
||||
Credentials cred = new Credentials();
|
||||
cred.addToken(token.getService(), token);
|
||||
Token<?> tokens[] = fs.addDelegationTokens(renewer, cred);
|
||||
cred.writeTokenStorageFile(tokenFile, conf);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetched token for " + token.getService()
|
||||
+ " into " + tokenFile);
|
||||
for (Token<?> token : tokens) {
|
||||
LOG.debug("Fetched token for " + token.getService()
|
||||
+ " into " + tokenFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,6 @@
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
@ -376,8 +375,7 @@ URL toUrl(final HttpOpParam.Op op, final Path fspath,
|
||||
+ Param.toSortedString("&", parameters);
|
||||
final URL url;
|
||||
if (op == PutOpParam.Op.RENEWDELEGATIONTOKEN
|
||||
|| op == GetOpParam.Op.GETDELEGATIONTOKEN
|
||||
|| op == GetOpParam.Op.GETDELEGATIONTOKENS) {
|
||||
|| op == GetOpParam.Op.GETDELEGATIONTOKEN) {
|
||||
// Skip adding delegation token for getting or renewing delegation token,
|
||||
// because these operations require kerberos authentication.
|
||||
url = getNamenodeURL(path, query);
|
||||
@ -840,10 +838,9 @@ public FileStatus[] listStatus(final Path f) throws IOException {
|
||||
return statuses;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
||||
) throws IOException {
|
||||
public Token<DelegationTokenIdentifier> getDelegationToken(
|
||||
final String renewer) throws IOException {
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
|
||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
||||
final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m);
|
||||
@ -851,18 +848,6 @@ public Token<DelegationTokenIdentifier> getDelegationToken(final String renewer
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Token<?>> getDelegationTokens(final String renewer
|
||||
) throws IOException {
|
||||
final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKENS;
|
||||
final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
|
||||
final List<Token<?>> tokens = JsonUtil.toTokenList(m);
|
||||
for(Token<?> t : tokens) {
|
||||
SecurityUtil.setTokenService(t, nnAddr);
|
||||
}
|
||||
return tokens;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getRenewToken() {
|
||||
return delegationToken;
|
||||
|
@ -32,7 +32,6 @@ public static enum Op implements HttpOpParam.Op {
|
||||
|
||||
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
||||
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
|
||||
GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
|
||||
|
||||
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
||||
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
||||
|
@ -59,7 +59,9 @@ public static void clusterSetupAtBegining() throws IOException,
|
||||
|
||||
@AfterClass
|
||||
public static void clusterShutdownAtEnd() throws Exception {
|
||||
cluster.shutdown();
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -84,7 +86,7 @@ void initializeTargetTestRoot() throws IOException {
|
||||
|
||||
@Override
|
||||
int getExpectedDelegationTokenCount() {
|
||||
return 8;
|
||||
return 1; // all point to the same fs so 1 unique token
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,7 +117,7 @@ int getExpectedMountPoints() {
|
||||
|
||||
@Override
|
||||
int getExpectedDelegationTokenCount() {
|
||||
return 9;
|
||||
return 2; // Mount points to 2 unique hdfs
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -28,8 +28,6 @@
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
@ -50,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
@ -154,25 +153,18 @@ public void testCancelDelegationToken() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenDFSApi() throws Exception {
|
||||
public void testAddDelegationTokensDFSApi() throws Exception {
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("JobTracker");
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
||||
final Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
new ByteArrayInputStream(tokenId)));
|
||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
UserGroupInformation.createRemoteUser("JobTracker").doAs(
|
||||
new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Credentials creds = new Credentials();
|
||||
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
Assert.assertEquals(1, creds.numberOfTokens());
|
||||
checkTokenIdentifier(ugi, tokens[0]);
|
||||
|
||||
final Token<?> tokens2[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||
Assert.assertEquals(0, tokens2.length); // already have token
|
||||
Assert.assertEquals(1, creds.numberOfTokens());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -191,43 +183,15 @@ public WebHdfsFileSystem run() throws Exception {
|
||||
}
|
||||
});
|
||||
|
||||
{ //test getDelegationToken(..)
|
||||
final Token<DelegationTokenIdentifier> token = webhdfs
|
||||
.getDelegationToken("JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
{ //test getDelegationTokens(..)
|
||||
final List<Token<?>> tokenlist = webhdfs.getDelegationTokens("JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Token<DelegationTokenIdentifier> token = (Token<DelegationTokenIdentifier>)tokenlist.get(0);
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(tokenId)));
|
||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken(token, "JobTracker");
|
||||
ugi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
{ //test addDelegationTokens(..)
|
||||
Credentials creds = new Credentials();
|
||||
final Token<?> tokens[] = webhdfs.addDelegationTokens("JobTracker", creds);
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
Assert.assertEquals(1, creds.numberOfTokens());
|
||||
Assert.assertSame(tokens[0], creds.getAllTokens().iterator().next());
|
||||
checkTokenIdentifier(ugi, tokens[0]);
|
||||
final Token<?> tokens2[] = webhdfs.addDelegationTokens("JobTracker", creds);
|
||||
Assert.assertEquals(0, tokens2.length);
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,8 +199,12 @@ public Void run() throws Exception {
|
||||
@Test
|
||||
public void testDelegationTokenWithDoAs() throws Exception {
|
||||
final DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
dfs.getDelegationToken("JobTracker");
|
||||
final Credentials creds = new Credentials();
|
||||
final Token<?> tokens[] = dfs.addDelegationTokens("JobTracker", creds);
|
||||
Assert.assertEquals(1, tokens.length);
|
||||
@SuppressWarnings("unchecked")
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
(Token<DelegationTokenIdentifier>) tokens[0];
|
||||
final UserGroupInformation longUgi = UserGroupInformation
|
||||
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
||||
final UserGroupInformation shortUgi = UserGroupInformation
|
||||
@ -326,4 +294,33 @@ public void testDTManagerInSafeMode() throws Exception {
|
||||
assertFalse(nn.isInSafeMode());
|
||||
assertTrue(sm.isRunning());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void checkTokenIdentifier(UserGroupInformation ugi, final Token<?> token)
|
||||
throws Exception {
|
||||
Assert.assertNotNull(token);
|
||||
// should be able to use token.decodeIdentifier() but webhdfs isn't
|
||||
// registered with the service loader for token decoding
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
DataInputStream in = new DataInputStream(new ByteArrayInputStream(tokenId));
|
||||
try {
|
||||
identifier.readFields(in);
|
||||
} finally {
|
||||
in.close();
|
||||
}
|
||||
Assert.assertNotNull(identifier);
|
||||
LOG.info("A valid token should have non-null password, and should be renewed successfully");
|
||||
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||
dtSecretManager.renewToken((Token<DelegationTokenIdentifier>) token, "JobTracker");
|
||||
ugi.doAs(
|
||||
new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -135,15 +135,15 @@ public void testDelegationTokenWithRealUser() throws IOException {
|
||||
final UserGroupInformation proxyUgi = UserGroupInformation
|
||||
.createProxyUserForTesting(PROXY_USER, ugi, GROUP_NAMES);
|
||||
try {
|
||||
Token<DelegationTokenIdentifier> token = proxyUgi
|
||||
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
||||
Token<?>[] tokens = proxyUgi
|
||||
.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
|
||||
@Override
|
||||
public Token<DelegationTokenIdentifier> run() throws IOException {
|
||||
return cluster.getFileSystem().getDelegationToken("RenewerUser");
|
||||
public Token<?>[] run() throws IOException {
|
||||
return cluster.getFileSystem().addDelegationTokens("RenewerUser", null);
|
||||
}
|
||||
});
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
byte[] tokenId = tokens[0].getIdentifier();
|
||||
identifier.readFields(new DataInputStream(new ByteArrayInputStream(
|
||||
tokenId)));
|
||||
Assert.assertEquals(identifier.getUser().getUserName(), PROXY_USER);
|
||||
|
@ -41,7 +41,6 @@
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
@ -195,20 +194,21 @@ private CheckpointSignature runOperations() throws IOException {
|
||||
Path pathSymlink = new Path("/file_symlink");
|
||||
fc.createSymlink(pathConcatTarget, pathSymlink, false);
|
||||
// OP_GET_DELEGATION_TOKEN 18
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
dfs.getDelegationToken("JobTracker");
|
||||
// OP_RENEW_DELEGATION_TOKEN 19
|
||||
// OP_CANCEL_DELEGATION_TOKEN 20
|
||||
// see TestDelegationToken.java
|
||||
// fake the user to renew token for
|
||||
final Token<?>[] tokens = dfs.addDelegationTokens("JobTracker", null);
|
||||
UserGroupInformation longUgi = UserGroupInformation.createRemoteUser(
|
||||
"JobTracker/foo.com@FOO.COM");
|
||||
try {
|
||||
longUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public Object run() throws IOException, InterruptedException {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
for (Token<?> token : tokens) {
|
||||
token.renew(config);
|
||||
token.cancel(config);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -116,7 +116,8 @@ public void prepTest() {
|
||||
|
||||
@Test
|
||||
public void testDelegationTokenDFSApi() throws Exception {
|
||||
Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||
byte[] tokenId = token.getIdentifier();
|
||||
identifier.readFields(new DataInputStream(
|
||||
@ -157,8 +158,8 @@ public void testDelegationTokenDFSApi() throws Exception {
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void testDelegationTokenWithDoAs() throws Exception {
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
dfs.getDelegationToken("JobTracker");
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
final UserGroupInformation longUgi = UserGroupInformation
|
||||
.createRemoteUser("JobTracker/foo.com@FOO.COM");
|
||||
final UserGroupInformation shortUgi = UserGroupInformation
|
||||
@ -196,8 +197,8 @@ public Void run() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testHAUtilClonesDelegationTokens() throws Exception {
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
dfs.getDelegationToken("test");
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
|
||||
|
||||
@ -258,8 +259,9 @@ public void testDFSGetCanonicalServiceName() throws Exception {
|
||||
URI hAUri = HATestUtil.getLogicalUri(cluster);
|
||||
String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri).toString();
|
||||
assertEquals(haService, dfs.getCanonicalServiceName());
|
||||
Token<?> token = dfs.getDelegationToken(
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
final String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
final Token<DelegationTokenIdentifier> token =
|
||||
getDelegationToken(dfs, renewer);
|
||||
assertEquals(haService, token.getService().toString());
|
||||
// make sure the logical uri is handled correctly
|
||||
token.renew(dfs.getConf());
|
||||
@ -281,6 +283,13 @@ public void testHdfsGetCanonicalServiceName() throws Exception {
|
||||
token.cancel(conf);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
||||
String renewer) throws IOException {
|
||||
final Token<?> tokens[] = fs.addDelegationTokens(renewer, null);
|
||||
assertEquals(1, tokens.length);
|
||||
return (Token<DelegationTokenIdentifier>) tokens[0];
|
||||
}
|
||||
enum TokenTestAction {
|
||||
RENEW, CANCEL;
|
||||
}
|
||||
|
@ -126,8 +126,8 @@ public static void createOriginalFSImage() throws IOException {
|
||||
}
|
||||
|
||||
// Get delegation tokens so we log the delegation token op
|
||||
List<Token<?>> delegationTokens =
|
||||
hdfs.getDelegationTokens(TEST_RENEWER);
|
||||
Token<?>[] delegationTokens =
|
||||
hdfs.addDelegationTokens(TEST_RENEWER, null);
|
||||
for (Token<?> t : delegationTokens) {
|
||||
LOG.debug("got token " + t);
|
||||
}
|
||||
|
@ -41,6 +41,9 @@
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import static org.mockito.Matchers.*;
|
||||
|
||||
public class TestDelegationTokenFetcher {
|
||||
private DistributedFileSystem dfs;
|
||||
@ -105,9 +108,17 @@ public void expectedTokenIsRetrievedFromDFS() throws Exception {
|
||||
|
||||
// Create a token for the fetcher to fetch, wire NN to return it when asked
|
||||
// for this particular user.
|
||||
Token<DelegationTokenIdentifier> t =
|
||||
final Token<DelegationTokenIdentifier> t =
|
||||
new Token<DelegationTokenIdentifier>(ident, pw, KIND, service);
|
||||
when(dfs.getDelegationToken(eq((String) null))).thenReturn(t);
|
||||
when(dfs.addDelegationTokens(eq((String) null), any(Credentials.class))).thenAnswer(
|
||||
new Answer<Token<?>[]>() {
|
||||
@Override
|
||||
public Token<?>[] answer(InvocationOnMock invocation) {
|
||||
Credentials creds = (Credentials)invocation.getArguments()[1];
|
||||
creds.addToken(service, t);
|
||||
return new Token<?>[]{t};
|
||||
}
|
||||
});
|
||||
when(dfs.renewDelegationToken(eq(t))).thenReturn(1000L);
|
||||
when(dfs.getUri()).thenReturn(uri);
|
||||
FakeRenewer.reset();
|
||||
|
@ -20,7 +20,6 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -110,7 +109,6 @@ static void obtainTokensForNamenodesInternal(Credentials credentials,
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||
Credentials credentials, Configuration conf) throws IOException {
|
||||
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
||||
@ -120,26 +118,11 @@ static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||
}
|
||||
mergeBinaryTokens(credentials, conf);
|
||||
|
||||
String fsName = fs.getCanonicalServiceName();
|
||||
if (TokenCache.getDelegationToken(credentials, fsName) == null) {
|
||||
List<Token<?>> tokens =
|
||||
fs.getDelegationTokens(delegTokenRenewer, credentials);
|
||||
if (tokens != null) {
|
||||
for (Token<?> token : tokens) {
|
||||
credentials.addToken(token.getService(), token);
|
||||
LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
|
||||
";t.service="+token.getService());
|
||||
}
|
||||
}
|
||||
//Call getDelegationToken as well for now - for FS implementations
|
||||
// which may not have implmented getDelegationTokens (hftp)
|
||||
if (tokens == null || tokens.size() == 0) {
|
||||
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
||||
if (token != null) {
|
||||
credentials.addToken(token.getService(), token);
|
||||
LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
|
||||
+ ";t.service=" + token.getService());
|
||||
}
|
||||
final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
|
||||
credentials);
|
||||
if (tokens != null) {
|
||||
for (Token<?> token : tokens) {
|
||||
LOG.info("Got dt for " + fs.getUri() + "; "+token);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -173,21 +156,6 @@ private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
|
||||
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
|
||||
private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
|
||||
|
||||
/**
|
||||
*
|
||||
* @param namenode
|
||||
* @return delegation token
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static Token<?> getDelegationToken(
|
||||
Credentials credentials, String namenode) {
|
||||
//No fs specific tokens issues by this fs. It may however issue tokens
|
||||
// for other filesystems - which would be keyed by that filesystems name.
|
||||
if (namenode == null)
|
||||
return null;
|
||||
return (Token<?>) credentials.getToken(new Text(namenode));
|
||||
}
|
||||
|
||||
/**
|
||||
* load job token from a file
|
||||
* @param conf
|
||||
|
@ -18,23 +18,16 @@
|
||||
|
||||
package org.apache.hadoop.mapreduce.security;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileSystemTestHelper.MockFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.Master;
|
||||
@ -43,145 +36,42 @@
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
public class TestTokenCache {
|
||||
private static Configuration conf;
|
||||
private static String renewer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
renewer = Master.getMasterPrincipal(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testGetDelegationTokensNotImplemented() throws Exception {
|
||||
public void testObtainTokens() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
|
||||
FileSystem fs = setupSingleFsWithoutGetDelegationTokens();
|
||||
FileSystem fs = mock(FileSystem.class);
|
||||
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
verify(fs).getDelegationTokens(renewer, credentials);
|
||||
verify(fs).getDelegationToken(renewer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManagedFileSystem() throws Exception {
|
||||
Credentials credentials = new Credentials();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
|
||||
FileSystem singleFs = setupSingleFs();
|
||||
FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials);
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||
assertEquals(1, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||
assertEquals(2, credentials.getAllTokens().size());
|
||||
|
||||
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||
assertEquals(2, credentials.getAllTokens().size());
|
||||
|
||||
verify(singleFs, times(1)).getDelegationTokens(renewer, credentials);
|
||||
verify(multiFs, times(2)).getDelegationTokens(renewer, credentials);
|
||||
// A call to getDelegationToken would have generated an exception.
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4");
|
||||
when(mockFs.getUri()).thenReturn(new URI("singlefs4:///"));
|
||||
|
||||
final Token<?> mockToken = (Token<?>) mock(Token.class);
|
||||
when(mockToken.getService()).thenReturn(new Text("singlefs4"));
|
||||
|
||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||
new Answer<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
||||
return mockToken;
|
||||
}
|
||||
});
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||
.thenReturn(new LinkedList<Token<?>>());
|
||||
|
||||
return mockFs;
|
||||
}
|
||||
|
||||
private FileSystem setupSingleFs() throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1");
|
||||
when(mockFs.getUri()).thenReturn(new URI("singlefs1:///"));
|
||||
|
||||
List<Token<?>> tokens = new LinkedList<Token<?>>();
|
||||
Token<?> mockToken = mock(Token.class);
|
||||
when(mockToken.getService()).thenReturn(new Text("singlefs1"));
|
||||
tokens.add(mockToken);
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||
new RuntimeException(
|
||||
"getDelegationTokens(renewer) should not be called"));
|
||||
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||
.thenReturn(tokens);
|
||||
|
||||
return mockFs;
|
||||
}
|
||||
|
||||
private FileSystem setupMultiFs(final FileSystem singleFs,
|
||||
final String renewer, final Credentials credentials) throws Exception {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn(null);
|
||||
when(mockFs.getUri()).thenReturn(new URI("multifs:///"));
|
||||
|
||||
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||
new RuntimeException(
|
||||
"getDelegationTokens(renewer) should not be called"));
|
||||
when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer(
|
||||
new Answer<List<Token<?>>>() {
|
||||
|
||||
@Override
|
||||
public List<Token<?>> answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
List<Token<?>> newTokens = new LinkedList<Token<?>>();
|
||||
if (credentials.getToken(new Text("singlefs1")) == null) {
|
||||
newTokens.addAll(singleFs.getDelegationTokens(renewer,
|
||||
credentials));
|
||||
} else {
|
||||
newTokens.add(credentials.getToken(new Text("singlefs1")));
|
||||
}
|
||||
Token<?> mockToken2 = mock(Token.class);
|
||||
when(mockToken2.getService()).thenReturn(new Text("singlefs2"));
|
||||
newTokens.add(mockToken2);
|
||||
return newTokens;
|
||||
}
|
||||
});
|
||||
|
||||
return mockFs;
|
||||
verify(fs).addDelegationTokens(eq(renewer), eq(credentials));
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("deprecation")
|
||||
public void testBinaryCredentials() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
|
||||
Path TEST_ROOT_DIR =
|
||||
new Path(System.getProperty("test.build.data","test/build/data"));
|
||||
// ick, but need fq path minus file:/
|
||||
String binaryTokenFile = FileSystem.getLocal(conf).makeQualified(
|
||||
new Path(TEST_ROOT_DIR, "tokenFile")).toUri().getPath();
|
||||
|
||||
FileSystem fs1 = createFileSystemForService("service1");
|
||||
FileSystem fs2 = createFileSystemForService("service2");
|
||||
FileSystem fs3 = createFileSystemForService("service3");
|
||||
MockFileSystem fs1 = createFileSystemForServiceName("service1");
|
||||
MockFileSystem fs2 = createFileSystemForServiceName("service2");
|
||||
MockFileSystem fs3 = createFileSystemForServiceName("service3");
|
||||
|
||||
// get the tokens for fs1 & fs2 and write out to binary creds file
|
||||
Credentials creds = new Credentials();
|
||||
@ -196,7 +86,7 @@ public void testBinaryCredentials() throws Exception {
|
||||
// re-init creds and add a newer token for fs1
|
||||
creds = new Credentials();
|
||||
Token<?> newerToken1 = fs1.getDelegationToken(renewer);
|
||||
assertFalse(newerToken1.equals(token1));
|
||||
assertNotSame(newerToken1, token1);
|
||||
creds.addToken(newerToken1.getService(), newerToken1);
|
||||
checkToken(creds, newerToken1);
|
||||
|
||||
@ -230,10 +120,9 @@ private void checkToken(Credentials creds, Token<?> ... tokens) {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private FileSystem createFileSystemForService(final String service)
|
||||
private MockFileSystem createFileSystemForServiceName(final String service)
|
||||
throws IOException {
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
MockFileSystem mockFs = new MockFileSystem();
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn(service);
|
||||
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||
new Answer<Token<?>>() {
|
||||
@ -258,7 +147,8 @@ public void testSingleTokenFetch() throws Exception {
|
||||
String renewer = Master.getMasterPrincipal(conf);
|
||||
Credentials credentials = new Credentials();
|
||||
|
||||
FileSystem mockFs = mock(FileSystem.class);
|
||||
final MockFileSystem fs = new MockFileSystem();
|
||||
final MockFileSystem mockFs = (MockFileSystem) fs.getRawFileSystem();
|
||||
when(mockFs.getCanonicalServiceName()).thenReturn("host:0");
|
||||
when(mockFs.getUri()).thenReturn(new URI("mockfs://host:0"));
|
||||
|
||||
@ -266,9 +156,9 @@ public void testSingleTokenFetch() throws Exception {
|
||||
when(mockPath.getFileSystem(conf)).thenReturn(mockFs);
|
||||
|
||||
Path[] paths = new Path[]{ mockPath, mockPath };
|
||||
when(mockFs.getDelegationTokens("me", credentials)).thenReturn(null);
|
||||
when(mockFs.addDelegationTokens("me", credentials)).thenReturn(null);
|
||||
TokenCache.obtainTokensForNamenodesInternal(credentials, paths, conf);
|
||||
verify(mockFs, times(1)).getDelegationTokens(renewer, credentials);
|
||||
verify(mockFs, times(1)).addDelegationTokens(renewer, credentials);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -278,5 +168,4 @@ public void testCleanUpTokenReferral() throws Exception {
|
||||
TokenCache.cleanUpTokenReferral(conf);
|
||||
assertNull(conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user