From a957cd5049326e4ff1aa6db68769772a70c9ee45 Mon Sep 17 00:00:00 2001 From: Peter Szucs <116345192+p-szucs@users.noreply.github.com> Date: Wed, 20 Mar 2024 15:33:10 +0100 Subject: [PATCH] YARN-5305. Allow log aggregation to discard expired delegation tokens (#6625) --- .../apache/hadoop/security/Credentials.java | 8 +++ .../hadoop/security/UserGroupInformation.java | 13 +++- .../logaggregation/AppLogAggregatorImpl.java | 37 +++++++++- .../security/NMDelegationTokenManager.java | 49 +++++++++++++ .../TestLogAggregationService.java | 68 +++++++++++++++++++ 5 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java index bbbcc95288..77f43fbdcb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java @@ -136,6 +136,14 @@ public void addToken(Text alias, Token t) { } } + /** + * Remove a token from the storage (in memory). + * @param alias the alias for the key + */ + public void removeToken(Text alias) { + tokenMap.remove(alias); + } + /** * Return all the tokens in the in-memory map. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java index 8a5a0ee234..07be1f8e54 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java @@ -1715,7 +1715,18 @@ public boolean addToken(Text alias, Token token) { return true; } } - + + /** + * Remove a named token from this UGI. + * + * @param alias Name of the token + */ + public void removeToken(Text alias) { + synchronized (subject) { + getCredentialsInternal().removeToken(alias); + } + } + /** * Obtain the collection of tokens associated with this user. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 1ba7353a1e..ce6397e390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -35,8 +35,11 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.StringUtils; @@ -68,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask; import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask; +import org.apache.hadoop.yarn.server.nodemanager.security.NMDelegationTokenManager; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -75,6 +79,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND; + public class AppLogAggregatorImpl implements AppLogAggregator { @@ -117,6 +123,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final LogAggregationFileController logAggregationFileController; + private NMDelegationTokenManager delegationTokenManager; /** * The value recovered from state store to determine the age of application @@ -218,6 +225,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher, logAggregationInRolling, rollingMonitorInterval, this.appId, this.appAcls, this.nodeId, this.userUgi); + delegationTokenManager = new NMDelegationTokenManager(conf); } private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) { @@ -286,7 +294,11 @@ private void uploadLogsForContainers(boolean appFinished) } addCredentials(); - + try { + removeExpiredDelegationTokens(); + } catch (IOException | InterruptedException e) { + LOG.warn("Removing expired delegation tokens failed for " + appId, e); + } // Create a set of Containers whose logs will be uploaded in this cycle. // It includes: // a) all containers in pendingContainers: those containers are finished @@ -431,6 +443,29 @@ private void addCredentials() { } } + private void removeExpiredDelegationTokens() + throws IOException, InterruptedException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + + for (Map.Entry> tokenEntry : userUgi.getCredentials().getTokenMap().entrySet()) { + Token token = tokenEntry.getValue(); + + if (token.getKind().equals(HDFS_DELEGATION_KIND)) { + try { + delegationTokenManager.renewToken(token); + LOG.debug("HDFS Delegation Token for {} is successfully renewed: {}", + appId, token); + } catch (SecretManager.InvalidToken e) { + userUgi.removeToken(tokenEntry.getKey()); + LOG.info("HDFS Delegation Token for {} is expired, " + + "removed from the credentials: {}", appId, token); + } + } + } + } + private void sendLogAggregationReport( boolean logAggregationSucceedInThisCycle, String diagnosticMessage, boolean appFinished) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java new file mode 100644 index 0000000000..d30a411473 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMDelegationTokenManager.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +public class NMDelegationTokenManager { + + private final Configuration conf; + + public NMDelegationTokenManager(Configuration conf) { + this.conf = conf; + } + + /** + * Renews a token on behalf of the user logged in. + * @param token Token to be renewed + * @return Expiration time for the token + * @throws IOException raised on errors performing I/O. + * @throws InterruptedException if the thread is interrupted. + */ + public Long renewToken(Token token) + throws IOException, InterruptedException { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + return ugi.doAs((PrivilegedExceptionAction) () -> token.renew(conf)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 746826a136..72138e7199 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation; +import static org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.HDFS_DELEGATION_KIND; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -51,6 +52,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -83,6 +85,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; @@ -2588,6 +2591,71 @@ public Boolean get() { logAggregationService.stop(); } + @Test (timeout = 20000) + public void testRemoveExpiredDelegationTokensBeforeUpload() throws Exception { + Configuration conf = new YarnConfiguration(); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + ApplicationId applicationId = BuilderUtils.newApplicationId(1234, 1); + Application application = mockApplication(); + this.context.getApplications().put(applicationId, application); + + @SuppressWarnings("resource") + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler); + logAggregationService.init(this.conf); + logAggregationService.start(); + + logAggregationService.handle(new LogHandlerAppStartedEvent(applicationId, + this.user, null, this.acls, + Records.newRecord(LogAggregationContext.class))); + + // Adding a valid and an expired delegation token to the credentials + Token renewableToken = mockRenewableToken(); + Token expiredToken = mockExpiredToken(); + + Credentials credentials = new Credentials(); + credentials.addToken(new Text("renewableToken"), renewableToken); + credentials.addToken(new Text("expiredToken"), expiredToken); + + UserGroupInformation ugi = + ((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators() + .get(applicationId)).getUgi(); + ugi.addCredentials(credentials); + + logAggregationService.handle(new LogHandlerAppFinishedEvent(applicationId)); + + GenericTestUtils.waitFor(() -> { + Collection> tokens = ugi.getCredentials().getAllTokens(); + return tokens.size() == 1 && tokens.contains(renewableToken); + }, 1000, 20000); + logAggregationService.stop(); + } + + private Application mockApplication() { + Application mockApp = mock(Application.class); + when(mockApp.getContainers()).thenReturn( + new HashMap()); + return mockApp; + } + + private Token mockRenewableToken() throws IOException, InterruptedException { + Token renewableToken = mock(Token.class); + when(renewableToken.getKind()).thenReturn(HDFS_DELEGATION_KIND); + when(renewableToken.renew(this.conf)).thenReturn(0L); + return renewableToken; + } + + private Token mockExpiredToken() throws IOException, InterruptedException { + Token expiredToken = mock(Token.class); + when(expiredToken.getKind()).thenReturn(HDFS_DELEGATION_KIND); + when(expiredToken.renew(this.conf)) + .thenThrow(new SecretManager.InvalidToken("")); + return expiredToken; + } + @Test (timeout = 20000) public void testSkipUnnecessaryNNOperationsForShortJob() throws Exception { LogAggregationContext logAggregationContext =