HDFS-9276. Failed to Update HDFS Delegation Token for long running application in HA mode. Contributed by Liangliang Gu and John Zhuge
This commit is contained in:
parent
ce3d68e9c3
commit
d9aae22fdf
@ -95,10 +95,24 @@ public Token<? extends TokenIdentifier> getToken(Text alias) {
|
||||
* @param t the token object
|
||||
*/
|
||||
public void addToken(Text alias, Token<? extends TokenIdentifier> t) {
|
||||
if (t != null) {
|
||||
tokenMap.put(alias, t);
|
||||
} else {
|
||||
if (t == null) {
|
||||
LOG.warn("Null token ignored for " + alias);
|
||||
} else if (tokenMap.put(alias, t) != null) {
|
||||
// Update private tokens
|
||||
Map<Text, Token<? extends TokenIdentifier>> tokensToAdd =
|
||||
new HashMap<>();
|
||||
for (Map.Entry<Text, Token<? extends TokenIdentifier>> e :
|
||||
tokenMap.entrySet()) {
|
||||
Token<? extends TokenIdentifier> token = e.getValue();
|
||||
if (token instanceof Token.PrivateToken &&
|
||||
((Token.PrivateToken) token).getPublicService().equals(alias)) {
|
||||
Token<? extends TokenIdentifier> privateToken =
|
||||
new Token.PrivateToken<>(t);
|
||||
privateToken.setService(token.getService());
|
||||
tokensToAdd.put(e.getKey(), privateToken);
|
||||
}
|
||||
}
|
||||
tokenMap.putAll(tokensToAdd);
|
||||
}
|
||||
}
|
||||
|
||||
@ -397,7 +411,7 @@ private void addAll(Credentials other, boolean overwrite) {
|
||||
for(Map.Entry<Text, Token<?>> token: other.tokenMap.entrySet()){
|
||||
Text key = token.getKey();
|
||||
if (!tokenMap.containsKey(key) || overwrite) {
|
||||
tokenMap.put(key, token.getValue());
|
||||
addToken(key, token.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -96,10 +96,10 @@ public Token() {
|
||||
* @param other the token to clone
|
||||
*/
|
||||
public Token(Token<T> other) {
|
||||
this.identifier = other.identifier;
|
||||
this.password = other.password;
|
||||
this.kind = other.kind;
|
||||
this.service = other.service;
|
||||
this.identifier = other.identifier.clone();
|
||||
this.password = other.password.clone();
|
||||
this.kind = new Text(other.kind);
|
||||
this.service = new Text(other.service);
|
||||
}
|
||||
|
||||
public Token<T> copyToken() {
|
||||
@ -230,8 +230,37 @@ public void setService(Text newService) {
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public static class PrivateToken<T extends TokenIdentifier> extends Token<T> {
|
||||
final private Text publicService;
|
||||
|
||||
public PrivateToken(Token<T> token) {
|
||||
super(token);
|
||||
publicService = new Text(token.getService());
|
||||
}
|
||||
|
||||
public Text getPublicService() {
|
||||
return publicService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
if (!super.equals(o)) {
|
||||
return false;
|
||||
}
|
||||
PrivateToken<?> that = (PrivateToken<?>) o;
|
||||
return publicService.equals(that.publicService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = super.hashCode();
|
||||
result = 31 * result + publicService.hashCode();
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -367,6 +368,37 @@ public void testHdfsGetCanonicalServiceName() throws Exception {
|
||||
token.cancel(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testCancelAndUpdateDelegationTokens() throws Exception {
|
||||
// Create UGI with token1
|
||||
String user = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user);
|
||||
|
||||
ugi1.doAs(new PrivilegedExceptionAction<Void>() {
|
||||
public Void run() throws Exception {
|
||||
final Token<DelegationTokenIdentifier> token1 =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
UserGroupInformation.getCurrentUser()
|
||||
.addToken(token1.getService(), token1);
|
||||
|
||||
FileSystem fs1 = HATestUtil.configureFailoverFs(cluster, conf);
|
||||
|
||||
// Cancel token1
|
||||
doRenewOrCancel(token1, conf, TokenTestAction.CANCEL);
|
||||
|
||||
// Update UGI with token2
|
||||
final Token<DelegationTokenIdentifier> token2 =
|
||||
getDelegationToken(fs, "JobTracker");
|
||||
UserGroupInformation.getCurrentUser()
|
||||
.addToken(token2.getService(), token2);
|
||||
|
||||
// Check whether token2 works
|
||||
fs1.listFiles(new Path("/"), false);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Token<DelegationTokenIdentifier> getDelegationToken(FileSystem fs,
|
||||
String renewer) throws IOException {
|
||||
|
Loading…
Reference in New Issue
Block a user