MAPREDUCE-3529. TokenCache does not cache viewfs credentials correctly. (sseth)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227374 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c27de4bd0b
commit
f445eb31f1
@ -395,6 +395,9 @@ Release 0.23.1 - Unreleased
|
|||||||
MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when
|
MAPREDUCE-1744. DistributedCache creates its own FileSytem instance when
|
||||||
adding a file/archive to the path. (Dick King via tucu)
|
adding a file/archive to the path. (Dick King via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-3529. TokenCache does not cache viewfs credentials correctly
|
||||||
|
(sseth)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -19,9 +19,7 @@
|
|||||||
package org.apache.hadoop.mapreduce.security;
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
@ -35,9 +33,7 @@
|
|||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapred.Master;
|
import org.apache.hadoop.mapred.Master;
|
||||||
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
@ -99,6 +95,7 @@ static void obtainTokensForNamenodesInternal(Credentials credentials,
|
|||||||
* @param conf
|
* @param conf
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||||
Credentials credentials, Configuration conf) throws IOException {
|
Credentials credentials, Configuration conf) throws IOException {
|
||||||
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
String delegTokenRenewer = Master.getMasterPrincipal(conf);
|
||||||
@ -131,7 +128,8 @@ static void obtainTokensForNamenodesInternal(FileSystem fs,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<Token<?>> tokens = fs.getDelegationTokens(delegTokenRenewer);
|
List<Token<?>> tokens =
|
||||||
|
fs.getDelegationTokens(delegTokenRenewer, credentials);
|
||||||
if (tokens != null) {
|
if (tokens != null) {
|
||||||
for (Token<?> token : tokens) {
|
for (Token<?> token : tokens) {
|
||||||
credentials.addToken(token.getService(), token);
|
credentials.addToken(token.getService(), token);
|
||||||
@ -141,13 +139,13 @@ static void obtainTokensForNamenodesInternal(FileSystem fs,
|
|||||||
}
|
}
|
||||||
//Call getDelegationToken as well for now - for FS implementations
|
//Call getDelegationToken as well for now - for FS implementations
|
||||||
// which may not have implmented getDelegationTokens (hftp)
|
// which may not have implmented getDelegationTokens (hftp)
|
||||||
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
if (tokens == null || tokens.size() == 0) {
|
||||||
if (token != null) {
|
Token<?> token = fs.getDelegationToken(delegTokenRenewer);
|
||||||
Text fsNameText = new Text(fsName);
|
if (token != null) {
|
||||||
token.setService(fsNameText);
|
credentials.addToken(token.getService(), token);
|
||||||
credentials.addToken(fsNameText, token);
|
LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
|
||||||
LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
|
+ ";t.service=" + token.getService());
|
||||||
";t.service="+token.getService());
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,161 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.security;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.mapred.Master;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
public class TestTokenCache {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public void testGetDelegationTokensNotImplemented() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||||
|
String renewer = Master.getMasterPrincipal(conf);
|
||||||
|
|
||||||
|
FileSystem fs = setupSingleFsWithoutGetDelegationTokens();
|
||||||
|
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
||||||
|
assertEquals(1, credentials.getAllTokens().size());
|
||||||
|
|
||||||
|
verify(fs).getDelegationTokens(renewer, credentials);
|
||||||
|
verify(fs).getDelegationToken(renewer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testManagedFileSystem() throws Exception {
|
||||||
|
Credentials credentials = new Credentials();
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_PRINCIPAL, "mapred/host@REALM");
|
||||||
|
String renewer = Master.getMasterPrincipal(conf);
|
||||||
|
|
||||||
|
FileSystem singleFs = setupSingleFs();
|
||||||
|
FileSystem multiFs = setupMultiFs(singleFs, renewer, credentials);
|
||||||
|
|
||||||
|
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||||
|
assertEquals(1, credentials.getAllTokens().size());
|
||||||
|
|
||||||
|
TokenCache.obtainTokensForNamenodesInternal(singleFs, credentials, conf);
|
||||||
|
assertEquals(1, credentials.getAllTokens().size());
|
||||||
|
|
||||||
|
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||||
|
assertEquals(2, credentials.getAllTokens().size());
|
||||||
|
|
||||||
|
TokenCache.obtainTokensForNamenodesInternal(multiFs, credentials, conf);
|
||||||
|
assertEquals(2, credentials.getAllTokens().size());
|
||||||
|
|
||||||
|
verify(singleFs, times(1)).getDelegationTokens(renewer, credentials);
|
||||||
|
verify(multiFs, times(2)).getDelegationTokens(renewer, credentials);
|
||||||
|
// A call to getDelegationToken would have generated an exception.
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
private FileSystem setupSingleFsWithoutGetDelegationTokens() throws Exception {
|
||||||
|
FileSystem mockFs = mock(FileSystem.class);
|
||||||
|
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs4");
|
||||||
|
when(mockFs.getUri()).thenReturn(new URI("singlefs4:///"));
|
||||||
|
|
||||||
|
final Token<?> mockToken = (Token<?>) mock(Token.class);
|
||||||
|
when(mockToken.getService()).thenReturn(new Text("singlefs4"));
|
||||||
|
|
||||||
|
when(mockFs.getDelegationToken(any(String.class))).thenAnswer(
|
||||||
|
new Answer<Token<?>>() {
|
||||||
|
@Override
|
||||||
|
public Token<?> answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
return mockToken;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||||
|
.thenReturn(new LinkedList<Token<?>>());
|
||||||
|
|
||||||
|
return mockFs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileSystem setupSingleFs() throws Exception {
|
||||||
|
FileSystem mockFs = mock(FileSystem.class);
|
||||||
|
when(mockFs.getCanonicalServiceName()).thenReturn("singlefs1");
|
||||||
|
when(mockFs.getUri()).thenReturn(new URI("singlefs1:///"));
|
||||||
|
|
||||||
|
List<Token<?>> tokens = new LinkedList<Token<?>>();
|
||||||
|
Token<?> mockToken = mock(Token.class);
|
||||||
|
when(mockToken.getService()).thenReturn(new Text("singlefs1"));
|
||||||
|
tokens.add(mockToken);
|
||||||
|
|
||||||
|
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||||
|
new RuntimeException(
|
||||||
|
"getDelegationTokens(renewer) should not be called"));
|
||||||
|
when(mockFs.getDelegationTokens(any(String.class), any(Credentials.class)))
|
||||||
|
.thenReturn(tokens);
|
||||||
|
|
||||||
|
return mockFs;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FileSystem setupMultiFs(final FileSystem singleFs,
|
||||||
|
final String renewer, final Credentials credentials) throws Exception {
|
||||||
|
FileSystem mockFs = mock(FileSystem.class);
|
||||||
|
when(mockFs.getCanonicalServiceName()).thenReturn("multifs");
|
||||||
|
when(mockFs.getUri()).thenReturn(new URI("multifs:///"));
|
||||||
|
|
||||||
|
when(mockFs.getDelegationTokens(any(String.class))).thenThrow(
|
||||||
|
new RuntimeException(
|
||||||
|
"getDelegationTokens(renewer) should not be called"));
|
||||||
|
when(mockFs.getDelegationTokens(renewer, credentials)).thenAnswer(
|
||||||
|
new Answer<List<Token<?>>>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Token<?>> answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
List<Token<?>> newTokens = new LinkedList<Token<?>>();
|
||||||
|
if (credentials.getToken(new Text("singlefs1")) == null) {
|
||||||
|
newTokens.addAll(singleFs.getDelegationTokens(renewer,
|
||||||
|
credentials));
|
||||||
|
} else {
|
||||||
|
newTokens.add(credentials.getToken(new Text("singlefs1")));
|
||||||
|
}
|
||||||
|
Token<?> mockToken2 = mock(Token.class);
|
||||||
|
when(mockToken2.getService()).thenReturn(new Text("singlefs2"));
|
||||||
|
newTokens.add(mockToken2);
|
||||||
|
return newTokens;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return mockFs;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user