From 4af4997e1129d3b9bd7fde7ec8731d6e79093fd8 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Fri, 2 Dec 2022 05:20:21 +0800 Subject: [PATCH] YARN-11158. Support (Create/Renew/Cancel) DelegationToken API's for Federation. (#5104) --- .../yarn/server/router/RouterMetrics.java | 97 ++++++++++- .../yarn/server/router/RouterServerUtil.java | 24 +++ .../clientrm/FederationClientInterceptor.java | 97 ++++++++++- .../yarn/server/router/TestRouterMetrics.java | 80 ++++++++- .../TestFederationClientInterceptor.java | 156 +++++++++++++++++- .../TestableFederationClientInterceptor.java | 28 ++++ 6 files changed, 468 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index b03aeda38b..31d838d1b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -127,6 +127,12 @@ public final class RouterMetrics { private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved; @Metric("# of checkUserAccessToQueue failed to be retrieved") private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved; + @Metric("# of getDelegationToken failed to be retrieved") + private MutableGaugeInt numGetDelegationTokenFailedRetrieved; + @Metric("# of renewDelegationToken failed to be retrieved") + private MutableGaugeInt numRenewDelegationTokenFailedRetrieved; + @Metric("# of renewDelegationToken failed to be retrieved") + private MutableGaugeInt numCancelDelegationTokenFailedRetrieved; // Aggregate metrics are shared, and don't have to be looked up per call @Metric("Total number of successful Submitted apps and latency(ms)") @@ -215,6 +221,12 @@ public final class RouterMetrics { private MutableRate totalSucceededGetRMNodeLabelsRetrieved; @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)") private MutableRate totalSucceededCheckUserAccessToQueueRetrieved; + @Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)") + private MutableRate totalSucceededGetDelegationTokenRetrieved; + @Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)") + private MutableRate totalSucceededRenewDelegationTokenRetrieved; + @Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)") + private MutableRate totalSucceededCancelDelegationTokenRetrieved; /** * Provide quantile counters for all latencies. @@ -262,6 +274,9 @@ public final class RouterMetrics { private MutableQuantiles getRefreshQueuesLatency; private MutableQuantiles getRMNodeLabelsLatency; private MutableQuantiles checkUserAccessToQueueLatency; + private MutableQuantiles getDelegationTokenLatency; + private MutableQuantiles renewDelegationTokenLatency; + private MutableQuantiles cancelDelegationTokenLatency; private static volatile RouterMetrics instance = null; private static MetricsRegistry registry; @@ -423,6 +438,15 @@ private RouterMetrics() { checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency", "latency of get apptimeouts timeouts", "ops", "latency", 10); + + getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency", + "latency of get delegation token timeouts", "ops", "latency", 10); + + renewDelegationTokenLatency = registry.newQuantiles("renewDelegationTokenLatency", + "latency of renew delegation token timeouts", "ops", "latency", 10); + + cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency", + "latency of cancel delegation token timeouts", "ops", "latency", 10); } public static RouterMetrics getMetrics() { @@ -655,10 +679,25 @@ public long getNumSucceededGetRMNodeLabelsRetrieved() { } @VisibleForTesting - public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() { + public long getNumSucceededCheckUserAccessToQueueRetrieved() { return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededGetDelegationTokenRetrieved() { + return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededRenewDelegationTokenRetrieved() { + return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples(); + } + + @VisibleForTesting + public long getNumSucceededCancelDelegationTokenRetrieved() { + return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public double getLatencySucceededAppsCreated() { return totalSucceededAppsCreated.lastStat().mean(); @@ -874,6 +913,21 @@ public double getLatencySucceededCheckUserAccessToQueueRetrieved() { return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededGetDelegationTokenRetrieved() { + return totalSucceededGetDelegationTokenRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededRenewDelegationTokenRetrieved() { + return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean(); + } + + @VisibleForTesting + public double getLatencySucceededCancelDelegationTokenRetrieved() { + return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean(); + } + @VisibleForTesting public int getAppsFailedCreated() { return numAppsFailedCreated.value(); @@ -1068,6 +1122,18 @@ public int getCheckUserAccessToQueueFailedRetrieved() { return numCheckUserAccessToQueueFailedRetrieved.value(); } + public int getDelegationTokenFailedRetrieved() { + return numGetDelegationTokenFailedRetrieved.value(); + } + + public int getRenewDelegationTokenFailedRetrieved() { + return numRenewDelegationTokenFailedRetrieved.value(); + } + + public int getCancelDelegationTokenFailedRetrieved() { + return numCancelDelegationTokenFailedRetrieved.value(); + } + public void succeededAppsCreated(long duration) { totalSucceededAppsCreated.add(duration); getNewApplicationLatency.add(duration); @@ -1283,6 +1349,21 @@ public void succeededCheckUserAccessToQueueRetrieved(long duration) { checkUserAccessToQueueLatency.add(duration); } + public void succeededGetDelegationTokenRetrieved(long duration) { + totalSucceededGetDelegationTokenRetrieved.add(duration); + getDelegationTokenLatency.add(duration); + } + + public void succeededRenewDelegationTokenRetrieved(long duration) { + totalSucceededRenewDelegationTokenRetrieved.add(duration); + renewDelegationTokenLatency.add(duration); + } + + public void succeededCancelDelegationTokenRetrieved(long duration) { + totalSucceededCancelDelegationTokenRetrieved.add(duration); + cancelDelegationTokenLatency.add(duration); + } + public void incrAppsFailedCreated() { numAppsFailedCreated.incr(); } @@ -1454,4 +1535,16 @@ public void incrGetRMNodeLabelsFailedRetrieved() { public void incrCheckUserAccessToQueueFailedRetrieved() { numCheckUserAccessToQueueFailedRetrieved.incr(); } -} \ No newline at end of file + + public void incrGetDelegationTokenFailedRetrieved() { + numGetDelegationTokenFailedRetrieved.incr(); + } + + public void incrRenewDelegationTokenFailedRetrieved() { + numRenewDelegationTokenFailedRetrieved.incr(); + } + + public void incrCancelDelegationTokenFailedRetrieved() { + numCancelDelegationTokenFailedRetrieved.incr(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 93818229dd..8c880f25dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -24,10 +24,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +38,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.EnumSet; import java.io.IOException; /** @@ -470,6 +473,27 @@ public static void validateContainerId(String containerId) } } + public static boolean isAllowedDelegationTokenOp() throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS, + UserGroupInformation.AuthenticationMethod.KERBEROS_SSL, + UserGroupInformation.AuthenticationMethod.CERTIFICATE) + .contains(UserGroupInformation.getCurrentUser() + .getRealAuthenticationMethod()); + } else { + return true; + } + } + + public static String getRenewerForToken(Token token) + throws IOException { + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + // we can always renew our own tokens + return loginUser.getUserName().equals(user.getUserName()) + ? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName(); + } + public static UserGroupInformation setupUser(final String userName) { UserGroupInformation user = null; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index cf457c7077..a50ea5bc42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.io.Text; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.lang.reflect.Method; @@ -40,7 +41,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.security.UserGroupInformation; @@ -118,9 +118,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; @@ -136,6 +140,7 @@ import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; +import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1392,19 +1397,103 @@ public GetContainersResponse getContainers(GetContainersRequest request) @Override public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + + if (request == null || request.getRenewer() == null) { + routerMetrics.incrGetDelegationTokenFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing getDelegationToken request or Renewer.", null); + } + + try { + // Verify that the connection is kerberos authenticated + if (!RouterServerUtil.isAllowedDelegationTokenOp()) { + routerMetrics.incrGetDelegationTokenFailedRetrieved(); + throw new IOException( + "Delegation Token can be issued only with kerberos authentication."); + } + + long startTime = clock.getTime(); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + Text owner = new Text(ugi.getUserName()); + Text realUser = null; + if (ugi.getRealUser() != null) { + realUser = new Text(ugi.getRealUser().getUserName()); + } + + RMDelegationTokenIdentifier tokenIdentifier = + new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser); + Token realRMDToken = + new Token<>(tokenIdentifier, this.getTokenSecretManager()); + + org.apache.hadoop.yarn.api.records.Token routerRMDTToken = + BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(), + realRMDToken.getKind().toString(), + realRMDToken.getPassword(), realRMDToken.getService().toString()); + + long stopTime = clock.getTime(); + routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - startTime)); + return GetDelegationTokenResponse.newInstance(routerRMDTToken); + } catch(IOException e) { + routerMetrics.incrGetDelegationTokenFailedRetrieved(); + throw new YarnException(e); + } } @Override public RenewDelegationTokenResponse renewDelegationToken( RenewDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + try { + + if (!RouterServerUtil.isAllowedDelegationTokenOp()) { + routerMetrics.incrRenewDelegationTokenFailedRetrieved(); + throw new IOException( + "Delegation Token can be renewed only with kerberos authentication"); + } + + long startTime = clock.getTime(); + org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); + Token token = new Token<>( + protoToken.getIdentifier().array(), protoToken.getPassword().array(), + new Text(protoToken.getKind()), new Text(protoToken.getService())); + String user = RouterServerUtil.getRenewerForToken(token); + long nextExpTime = this.getTokenSecretManager().renewToken(token, user); + RenewDelegationTokenResponse renewResponse = + Records.newRecord(RenewDelegationTokenResponse.class); + renewResponse.setNextExpirationTime(nextExpTime); + long stopTime = clock.getTime(); + routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - startTime)); + return renewResponse; + + } catch (IOException e) { + routerMetrics.incrRenewDelegationTokenFailedRetrieved(); + throw new YarnException(e); + } } @Override public CancelDelegationTokenResponse cancelDelegationToken( CancelDelegationTokenRequest request) throws YarnException, IOException { - throw new NotImplementedException("Code is not implemented"); + try { + if (!RouterServerUtil.isAllowedDelegationTokenOp()) { + routerMetrics.incrCancelDelegationTokenFailedRetrieved(); + throw new IOException( + "Delegation Token can be cancelled only with kerberos authentication"); + } + + long startTime = clock.getTime(); + org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken(); + Token token = new Token<>( + protoToken.getIdentifier().array(), protoToken.getPassword().array(), + new Text(protoToken.getKind()), new Text(protoToken.getService())); + String user = UserGroupInformation.getCurrentUser().getUserName(); + this.getTokenSecretManager().cancelToken(token, user); + long stopTime = clock.getTime(); + routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - startTime)); + return Records.newRecord(CancelDelegationTokenResponse.class); + } catch (IOException e) { + routerMetrics.incrCancelDelegationTokenFailedRetrieved(); + throw new YarnException(e); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index 828e5c69f3..9d5aeab5c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -519,10 +519,20 @@ public void getRMNodeLabelsFailed() { metrics.incrGetRMNodeLabelsFailedRetrieved(); } - public void getCheckUserAccessToQueueRetrieved() { - LOG.info("Mocked: failed checkUserAccessToQueueRetrieved call"); + public void getCheckUserAccessToQueueFailed() { + LOG.info("Mocked: failed checkUserAccessToQueue call"); metrics.incrCheckUserAccessToQueueFailedRetrieved(); } + + public void getDelegationTokenFailed() { + LOG.info("Mocked: failed getDelegationToken call"); + metrics.incrGetDelegationTokenFailedRetrieved(); + } + + public void getRenewDelegationTokenFailed() { + LOG.info("Mocked: failed renewDelegationToken call"); + metrics.incrRenewDelegationTokenFailedRetrieved(); + } } // Records successes for all calls @@ -743,6 +753,16 @@ public void getCheckUserAccessToQueueRetrieved(long duration) { LOG.info("Mocked: successful CheckUserAccessToQueue call with duration {}", duration); metrics.succeededCheckUserAccessToQueueRetrieved(duration); } + + public void getGetDelegationTokenRetrieved(long duration) { + LOG.info("Mocked: successful GetDelegationToken call with duration {}", duration); + metrics.succeededGetDelegationTokenRetrieved(duration); + } + + public void getRenewDelegationTokenRetrieved(long duration) { + LOG.info("Mocked: successful RenewDelegationToken call with duration {}", duration); + metrics.succeededRenewDelegationTokenRetrieved(duration); + } } @Test @@ -1510,16 +1530,16 @@ public void testGetRMNodeLabelsRetrievedFailed() { } @Test - public void testCheckUserAccessToQueueRetrievedRetrieved() { - long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved(); + public void testCheckUserAccessToQueueRetrieved() { + long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrieved(); goodSubCluster.getCheckUserAccessToQueueRetrieved(150); Assert.assertEquals(totalGoodBefore + 1, - metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved()); + metrics.getNumSucceededCheckUserAccessToQueueRetrieved()); Assert.assertEquals(150, metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA); goodSubCluster.getCheckUserAccessToQueueRetrieved(300); Assert.assertEquals(totalGoodBefore + 2, - metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved()); + metrics.getNumSucceededCheckUserAccessToQueueRetrieved()); Assert.assertEquals(225, metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA); } @@ -1527,8 +1547,54 @@ public void testCheckUserAccessToQueueRetrievedRetrieved() { @Test public void testCheckUserAccessToQueueRetrievedFailed() { long totalBadBefore = metrics.getCheckUserAccessToQueueFailedRetrieved(); - badSubCluster.getCheckUserAccessToQueueRetrieved(); + badSubCluster.getCheckUserAccessToQueueFailed(); Assert.assertEquals(totalBadBefore + 1, metrics.getCheckUserAccessToQueueFailedRetrieved()); } + + @Test + public void testGetDelegationTokenRetrieved() { + long totalGoodBefore = metrics.getNumSucceededGetDelegationTokenRetrieved(); + goodSubCluster.getGetDelegationTokenRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededGetDelegationTokenRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededGetDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getGetDelegationTokenRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededGetDelegationTokenRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededGetDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testGetDelegationTokenRetrievedFailed() { + long totalBadBefore = metrics.getDelegationTokenFailedRetrieved(); + badSubCluster.getDelegationTokenFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getDelegationTokenFailedRetrieved()); + } + + @Test + public void testRenewDelegationTokenRetrieved() { + long totalGoodBefore = metrics.getNumSucceededRenewDelegationTokenRetrieved(); + goodSubCluster.getRenewDelegationTokenRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededRenewDelegationTokenRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededRenewDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getRenewDelegationTokenRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededRenewDelegationTokenRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededRenewDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA); + } + + @Test + public void testRenewDelegationTokenRetrievedFailed() { + long totalBadBefore = metrics.getRenewDelegationTokenFailedRetrieved(); + badSubCluster.getRenewDelegationTokenFailed(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getRenewDelegationTokenFailedRetrieved()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 38f571c288..2488fc73b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.clientrm; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.Collection; +import org.apache.hadoop.io.Text; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.util.Time; @@ -100,6 +102,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -123,10 +131,13 @@ import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; @@ -138,6 +149,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; @@ -170,7 +184,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest { private final static long DEFAULT_DURATION = 10 * 60 * 1000; @Override - public void setUp() { + public void setUp() throws IOException { super.setUpConfig(); interceptor = new TestableFederationClientInterceptor(); @@ -181,6 +195,11 @@ public void setUp() { interceptor.setConf(this.getConf()); interceptor.init(user); + RouterDelegationTokenSecretManager tokenSecretManager = + interceptor.createRouterRMDelegationTokenSecretManager(this.getConf()); + + tokenSecretManager.startThreads(); + interceptor.setTokenSecretManager(tokenSecretManager); subClusters = new ArrayList<>(); @@ -230,6 +249,7 @@ protected YarnConfiguration createConfiguration() { conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024); conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100); + conf.setBoolean("hadoop.security.authentication", true); return conf; } @@ -1550,4 +1570,138 @@ public void testGetNumMaxThreads() { int minThreads2 = interceptor.getNumMaxThreads(this.getConf()); Assert.assertEquals(8, minThreads2); } + + @Test + public void testGetDelegationToken() throws IOException, YarnException { + + // We design such a unit test to check + // that the execution of the GetDelegationToken method is as expected. + // + // 1. Apply for a DelegationToken for renewer1, + // the Router returns the DelegationToken of the user, and the KIND of the token is + // RM_DELEGATION_TOKEN + // + // 2. We maintain the compatibility with RMDelegationTokenIdentifier, + // we can serialize the token into RMDelegationTokenIdentifier. + // + // 3. We can get the issueDate, and compare the data in the StateStore, + // the data should be consistent. + + // Step1. We apply for DelegationToken for renewer1 + // Both response & delegationToken cannot be empty + GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); + when(request.getRenewer()).thenReturn("renewer1"); + GetDelegationTokenResponse response = interceptor.getDelegationToken(request); + Assert.assertNotNull(response); + Token delegationToken = response.getRMDelegationToken(); + Assert.assertNotNull(delegationToken); + Assert.assertEquals("RM_DELEGATION_TOKEN", delegationToken.getKind()); + + // Step2. Serialize the returned Token as RMDelegationTokenIdentifier. + org.apache.hadoop.security.token.Token token = + ConverterUtils.convertFromYarn(delegationToken, (Text) null); + RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier(); + Assert.assertNotNull(rMDelegationTokenIdentifier); + + // Step3. Verify the returned data of the token. + String renewer = rMDelegationTokenIdentifier.getRenewer().toString(); + long issueDate = rMDelegationTokenIdentifier.getIssueDate(); + long maxDate = rMDelegationTokenIdentifier.getMaxDate(); + Assert.assertEquals("renewer1", renewer); + + long tokenMaxLifetime = this.getConf().getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + Assert.assertEquals(issueDate + tokenMaxLifetime, maxDate); + + RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); + Assert.assertNotNull(managerState); + + Map delegationTokenState = managerState.getTokenState(); + Assert.assertNotNull(delegationTokenState); + Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier)); + + long tokenRenewInterval = this.getConf().getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier); + Assert.assertEquals(issueDate + tokenRenewInterval, renewDate); + } + + @Test + public void testRenewDelegationToken() throws IOException, YarnException { + + // We design such a unit test to check + // that the execution of the GetDelegationToken method is as expected + // 1. Call GetDelegationToken to apply for delegationToken. + // 2. Call renewDelegationToken to refresh delegationToken. + // By looking at the code of AbstractDelegationTokenSecretManager#renewToken, + // we know that renewTime is calculated as Math.min(id.getMaxDate(), now + tokenRenewInterval) + // so renewTime will be less than or equal to maxDate. + // 3. We will compare whether the expirationTime returned to the + // client is consistent with the renewDate in the stateStore. + + // Step1. Call GetDelegationToken to apply for delegationToken. + GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); + when(request.getRenewer()).thenReturn("renewer2"); + GetDelegationTokenResponse response = interceptor.getDelegationToken(request); + Assert.assertNotNull(response); + Token delegationToken = response.getRMDelegationToken(); + + org.apache.hadoop.security.token.Token token = + ConverterUtils.convertFromYarn(delegationToken, (Text) null); + RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier(); + String renewer = rMDelegationTokenIdentifier.getRenewer().toString(); + long maxDate = rMDelegationTokenIdentifier.getMaxDate(); + Assert.assertEquals("renewer2", renewer); + + // Step2. Call renewDelegationToken to refresh delegationToken. + RenewDelegationTokenRequest renewRequest = Records.newRecord(RenewDelegationTokenRequest.class); + renewRequest.setDelegationToken(delegationToken); + RenewDelegationTokenResponse renewResponse = interceptor.renewDelegationToken(renewRequest); + Assert.assertNotNull(renewResponse); + + long expDate = renewResponse.getNextExpirationTime(); + Assert.assertTrue(expDate <= maxDate); + + // Step3. Compare whether the expirationTime returned to + // the client is consistent with the renewDate in the stateStore + RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); + Map delegationTokenState = managerState.getTokenState(); + Assert.assertNotNull(delegationTokenState); + Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier)); + long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier); + Assert.assertEquals(expDate, renewDate); + } + + @Test + public void testCancelDelegationToken() throws IOException, YarnException { + + // We design such a unit test to check + // that the execution of the CancelDelegationToken method is as expected + // 1. Call GetDelegationToken to apply for delegationToken. + // 2. Call CancelDelegationToken to cancel delegationToken. + // 3. Query the data in the StateStore and confirm that the Delegation has been deleted. + + // Step1. Call GetDelegationToken to apply for delegationToken. + GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class); + when(request.getRenewer()).thenReturn("renewer3"); + GetDelegationTokenResponse response = interceptor.getDelegationToken(request); + Assert.assertNotNull(response); + Token delegationToken = response.getRMDelegationToken(); + + // Step2. Call CancelDelegationToken to cancel delegationToken. + CancelDelegationTokenRequest cancelTokenRequest = + CancelDelegationTokenRequest.newInstance(delegationToken); + CancelDelegationTokenResponse cancelTokenResponse = + interceptor.cancelDelegationToken(cancelTokenRequest); + Assert.assertNotNull(cancelTokenResponse); + + // Step3. Query the data in the StateStore and confirm that the Delegation has been deleted. + // At this point, the size of delegationTokenState should be 0. + RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState(); + Map delegationTokenState = managerState.getTokenState(); + Assert.assertNotNull(delegationTokenState); + Assert.assertEquals(0, delegationTokenState.size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index 8279899e38..c8c647a0d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -28,8 +28,10 @@ import java.util.Map; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; @@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -51,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.junit.Assert; import org.slf4j.Logger; @@ -216,4 +220,28 @@ public void shutdown() { mockRMs.clear(); super.shutdown(); } + + public RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager( + Configuration conf) { + + long secretKeyInterval = conf.getLong( + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + + long tokenMaxLifetime = conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + + long tokenRenewInterval = conf.getLong( + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + + long removeScanInterval = conf.getTimeDuration( + YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY, + YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + + return new RouterDelegationTokenSecretManager(secretKeyInterval, + tokenMaxLifetime, tokenRenewInterval, removeScanInterval); + } } \ No newline at end of file