MAPREDUCE-7073. Optimize TokenCache#obtainTokensForNamenodesInternal
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
37269261d1
commit
1a95a4524a
@ -22,6 +22,7 @@
|
|||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -96,8 +97,9 @@ static void obtainTokensForNamenodesInternal(Credentials credentials,
|
|||||||
for(Path p: ps) {
|
for(Path p: ps) {
|
||||||
fsSet.add(p.getFileSystem(conf));
|
fsSet.add(p.getFileSystem(conf));
|
||||||
}
|
}
|
||||||
|
String masterPrincipal = Master.getMasterPrincipal(conf);
|
||||||
for (FileSystem fs : fsSet) {
|
for (FileSystem fs : fsSet) {
|
||||||
obtainTokensForNamenodesInternal(fs, credentials, conf);
|
obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,15 +124,17 @@ static boolean isTokenRenewalExcluded(FileSystem fs, Configuration conf) {
|
|||||||
* @param conf
|
* @param conf
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
static void obtainTokensForNamenodesInternal(FileSystem fs,
|
||||||
Credentials credentials, Configuration conf) throws IOException {
|
Credentials credentials, Configuration conf, String renewer)
|
||||||
|
throws IOException {
|
||||||
// RM skips renewing token with empty renewer
|
// RM skips renewing token with empty renewer
|
||||||
String delegTokenRenewer = "";
|
String delegTokenRenewer = "";
|
||||||
if (!isTokenRenewalExcluded(fs, conf)) {
|
if (!isTokenRenewalExcluded(fs, conf)) {
|
||||||
delegTokenRenewer = Master.getMasterPrincipal(conf);
|
if (StringUtils.isEmpty(renewer)) {
|
||||||
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
|
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Can't get Master Kerberos principal for use as renewer");
|
"Can't get Master Kerberos principal for use as renewer");
|
||||||
|
} else {
|
||||||
|
delegTokenRenewer = renewer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,8 +56,8 @@ public static void setup() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testObtainTokens() throws Exception {
|
public void testObtainTokens() throws Exception {
|
||||||
Credentials credentials = new Credentials();
|
Credentials credentials = new Credentials();
|
||||||
FileSystem fs = mock(FileSystem.class);
|
FileSystem fs = mock(FileSystem.class);
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs, credentials, conf, renewer);
|
||||||
verify(fs).addDelegationTokens(eq(renewer), eq(credentials));
|
verify(fs).addDelegationTokens(eq(renewer), eq(credentials));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,23 +105,23 @@ private void testBinaryCredentials(boolean hasScheme) throws Exception {
|
|||||||
checkToken(creds, newerToken1);
|
checkToken(creds, newerToken1);
|
||||||
|
|
||||||
// get token for fs1, see that fs2's token was loaded
|
// get token for fs1, see that fs2's token was loaded
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
|
||||||
checkToken(creds, newerToken1, token2);
|
checkToken(creds, newerToken1, token2);
|
||||||
|
|
||||||
// get token for fs2, nothing should change since already present
|
// get token for fs2, nothing should change since already present
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf, renewer);
|
||||||
checkToken(creds, newerToken1, token2);
|
checkToken(creds, newerToken1, token2);
|
||||||
|
|
||||||
// get token for fs3, should only add token for fs3
|
// get token for fs3, should only add token for fs3
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf, renewer);
|
||||||
Token<?> token3 = creds.getToken(new Text(fs3.getCanonicalServiceName()));
|
Token<?> token3 = creds.getToken(new Text(fs3.getCanonicalServiceName()));
|
||||||
assertTrue(token3 != null);
|
assertTrue(token3 != null);
|
||||||
checkToken(creds, newerToken1, token2, token3);
|
checkToken(creds, newerToken1, token2, token3);
|
||||||
|
|
||||||
// be paranoid, check one last time that nothing changes
|
// be paranoid, check one last time that nothing changes
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs2, creds, conf, renewer);
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs3, creds, conf, renewer);
|
||||||
checkToken(creds, newerToken1, token2, token3);
|
checkToken(creds, newerToken1, token2, token3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,7 +202,7 @@ public void testGetTokensForNamenodes() throws IOException,
|
|||||||
// wait to set, else the obtain tokens call above will fail with FNF
|
// wait to set, else the obtain tokens call above will fail with FNF
|
||||||
conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile);
|
conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, binaryTokenFile);
|
||||||
creds.writeTokenStorageFile(new Path(binaryTokenFile), conf);
|
creds.writeTokenStorageFile(new Path(binaryTokenFile), conf);
|
||||||
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf);
|
TokenCache.obtainTokensForNamenodesInternal(fs1, creds, conf, renewer);
|
||||||
String fs_addr = fs1.getCanonicalServiceName();
|
String fs_addr = fs1.getCanonicalServiceName();
|
||||||
Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr);
|
Token<?> nnt = TokenCache.getDelegationToken(creds, fs_addr);
|
||||||
assertNotNull("Token for nn is null", nnt);
|
assertNotNull("Token for nn is null", nnt);
|
||||||
|
Loading…
Reference in New Issue
Block a user