YARN-5305. Allow log aggregation to discard expired delegation tokens (#6625)

This commit is contained in:
Peter Szucs 2024-03-20 15:33:10 +01:00 committed by GitHub
parent 12a26d8b19
commit a957cd5049
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 173 additions and 2 deletions

View File

@ -136,6 +136,14 @@ public void addToken(Text alias, Token<? extends TokenIdentifier> t) {
} }
} }
/**
* Remove a token from the storage (in memory).
* @param alias the alias for the key
*/
public void removeToken(Text alias) {
tokenMap.remove(alias);
}
/** /**
* Return all the tokens in the in-memory map. * Return all the tokens in the in-memory map.
* *

View File

@ -1715,7 +1715,18 @@ public boolean addToken(Text alias, Token<? extends TokenIdentifier> token) {
return true; return true;
} }
} }
/**
* Remove a named token from this UGI.
*
* @param alias Name of the token
*/
public void removeToken(Text alias) {
synchronized (subject) {
getCredentialsInternal().removeToken(alias);
}
}
/** /**
* Obtain the collection of tokens associated with this user. * Obtain the collection of tokens associated with this user.
* *

View File

@ -35,8 +35,11 @@
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Sets;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -68,6 +71,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.Times;
@ -75,6 +79,8 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
public class AppLogAggregatorImpl implements AppLogAggregator { public class AppLogAggregatorImpl implements AppLogAggregator {
@ -117,6 +123,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final LogAggregationFileController logAggregationFileController; private final LogAggregationFileController logAggregationFileController;
private NMDelegationTokenManager delegationTokenManager;
/** /**
* The value recovered from state store to determine the age of application * The value recovered from state store to determine the age of application
@ -218,6 +225,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
logAggregationInRolling, logAggregationInRolling,
rollingMonitorInterval, rollingMonitorInterval,
this.appId, this.appAcls, this.nodeId, this.userUgi); this.appId, this.appAcls, this.nodeId, this.userUgi);
delegationTokenManager = new NMDelegationTokenManager(conf);
} }
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@ -286,7 +294,11 @@ private void uploadLogsForContainers(boolean appFinished)
} }
addCredentials(); addCredentials();
try {
removeExpiredDelegationTokens();
} catch (IOException | InterruptedException e) {
LOG.warn("Removing expired delegation tokens failed for " + appId, e);
}
// Create a set of Containers whose logs will be uploaded in this cycle. // Create a set of Containers whose logs will be uploaded in this cycle.
// It includes: // It includes:
// a) all containers in pendingContainers: those containers are finished // a) all containers in pendingContainers: those containers are finished
@ -431,6 +443,29 @@ private void addCredentials() {
} }
} }
private void removeExpiredDelegationTokens()
throws IOException, InterruptedException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
for (Map.Entry<Text, Token<?>> tokenEntry : userUgi.getCredentials().getTokenMap().entrySet()) {
Token<?> token = tokenEntry.getValue();
if (token.getKind().equals(HDFS_DELEGATION_KIND)) {
try {
delegationTokenManager.renewToken(token);
LOG.debug("HDFS Delegation Token for {} is successfully renewed: {}",
appId, token);
} catch (SecretManager.InvalidToken e) {
userUgi.removeToken(tokenEntry.getKey());
LOG.info("HDFS Delegation Token for {} is expired, " +
"removed from the credentials: {}", appId, token);
}
}
}
}
private void sendLogAggregationReport( private void sendLogAggregationReport(
boolean logAggregationSucceedInThisCycle, String diagnosticMessage, boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
boolean appFinished) { boolean appFinished) {

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
public class NMDelegationTokenManager {
private final Configuration conf;
public NMDelegationTokenManager(Configuration conf) {
this.conf = conf;
}
/**
* Renews a token on behalf of the user logged in.
* @param token Token to be renewed
* @return Expiration time for the token
* @throws IOException raised on errors performing I/O.
* @throws InterruptedException if the thread is interrupted.
*/
public Long renewToken(Token<? extends TokenIdentifier> token)
throws IOException, InterruptedException {
UserGroupInformation ugi = UserGroupInformation.getLoginUser();
return ugi.doAs((PrivilegedExceptionAction<Long>) () -> token.renew(conf));
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -51,6 +52,7 @@
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -83,6 +85,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@ -2588,6 +2591,71 @@ public Boolean get() {
logAggregationService.stop(); logAggregationService.stop();
} }
@Test (timeout = 20000)
public void testRemoveExpiredDelegationTokensBeforeUpload() throws Exception {
Configuration conf = new YarnConfiguration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
UserGroupInformation.setConfiguration(conf);
ApplicationId applicationId = BuilderUtils.newApplicationId(1234, 1);
Application application = mockApplication();
this.context.getApplications().put(applicationId, application);
@SuppressWarnings("resource")
LogAggregationService logAggregationService =
new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.start();
logAggregationService.handle(new LogHandlerAppStartedEvent(applicationId,
this.user, null, this.acls,
Records.newRecord(LogAggregationContext.class)));
// Adding a valid and an expired delegation token to the credentials
Token renewableToken = mockRenewableToken();
Token expiredToken = mockExpiredToken();
Credentials credentials = new Credentials();
credentials.addToken(new Text("renewableToken"), renewableToken);
credentials.addToken(new Text("expiredToken"), expiredToken);
UserGroupInformation ugi =
((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
.get(applicationId)).getUgi();
ugi.addCredentials(credentials);
logAggregationService.handle(new LogHandlerAppFinishedEvent(applicationId));
GenericTestUtils.waitFor(() -> {
Collection<Token<? extends TokenIdentifier>> tokens = ugi.getCredentials().getAllTokens();
return tokens.size() == 1 && tokens.contains(renewableToken);
}, 1000, 20000);
logAggregationService.stop();
}
private Application mockApplication() {
Application mockApp = mock(Application.class);
when(mockApp.getContainers()).thenReturn(
new HashMap<ContainerId, Container>());
return mockApp;
}
private Token mockRenewableToken() throws IOException, InterruptedException {
Token renewableToken = mock(Token.class);
when(renewableToken.getKind()).thenReturn(HDFS_DELEGATION_KIND);
when(renewableToken.renew(this.conf)).thenReturn(0L);
return renewableToken;
}
private Token mockExpiredToken() throws IOException, InterruptedException {
Token expiredToken = mock(Token.class);
when(expiredToken.getKind()).thenReturn(HDFS_DELEGATION_KIND);
when(expiredToken.renew(this.conf))
.thenThrow(new SecretManager.InvalidToken(""));
return expiredToken;
}
@Test (timeout = 20000) @Test (timeout = 20000)
public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception { public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception {
LogAggregationContext logAggregationContext = LogAggregationContext logAggregationContext =