YARN-11158. Support (Create/Renew/Cancel) DelegationToken API's for Federation. (#5104)

This commit is contained in:
slfan1989 2022-12-02 05:20:21 +08:00 committed by GitHub
parent 5440c75c4a
commit 4af4997e11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 468 additions and 14 deletions

View File

@ -127,6 +127,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
@Metric("# of checkUserAccessToQueue failed to be retrieved")
private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
@Metric("# of getDelegationToken failed to be retrieved")
private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
@Metric("# of renewDelegationToken failed to be retrieved")
private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -215,6 +221,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
@Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
@Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
private MutableRate totalSucceededGetDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
private MutableRate totalSucceededRenewDelegationTokenRetrieved;
@Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
private MutableRate totalSucceededCancelDelegationTokenRetrieved;
/**
* Provide quantile counters for all latencies.
@ -262,6 +274,9 @@ public final class RouterMetrics {
private MutableQuantiles getRefreshQueuesLatency;
private MutableQuantiles getRMNodeLabelsLatency;
private MutableQuantiles checkUserAccessToQueueLatency;
private MutableQuantiles getDelegationTokenLatency;
private MutableQuantiles renewDelegationTokenLatency;
private MutableQuantiles cancelDelegationTokenLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -423,6 +438,15 @@ private RouterMetrics() {
checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
"latency of get apptimeouts timeouts", "ops", "latency", 10);
getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
"latency of get delegation token timeouts", "ops", "latency", 10);
renewDelegationTokenLatency = registry.newQuantiles("renewDelegationTokenLatency",
"latency of renew delegation token timeouts", "ops", "latency", 10);
cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
"latency of cancel delegation token timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -655,10 +679,25 @@ public long getNumSucceededGetRMNodeLabelsRetrieved() {
}
@VisibleForTesting
public long getNumSucceededCheckUserAccessToQueueRetrievedRetrieved() {
public long getNumSucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -874,6 +913,21 @@ public double getLatencySucceededCheckUserAccessToQueueRetrieved() {
return totalSucceededCheckUserAccessToQueueRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetDelegationTokenRetrieved() {
return totalSucceededGetDelegationTokenRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRenewDelegationTokenRetrieved() {
return totalSucceededRenewDelegationTokenRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededCancelDelegationTokenRetrieved() {
return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -1068,6 +1122,18 @@ public int getCheckUserAccessToQueueFailedRetrieved() {
return numCheckUserAccessToQueueFailedRetrieved.value();
}
public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}
public int getRenewDelegationTokenFailedRetrieved() {
return numRenewDelegationTokenFailedRetrieved.value();
}
public int getCancelDelegationTokenFailedRetrieved() {
return numCancelDelegationTokenFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -1283,6 +1349,21 @@ public void succeededCheckUserAccessToQueueRetrieved(long duration) {
checkUserAccessToQueueLatency.add(duration);
}
public void succeededGetDelegationTokenRetrieved(long duration) {
totalSucceededGetDelegationTokenRetrieved.add(duration);
getDelegationTokenLatency.add(duration);
}
public void succeededRenewDelegationTokenRetrieved(long duration) {
totalSucceededRenewDelegationTokenRetrieved.add(duration);
renewDelegationTokenLatency.add(duration);
}
public void succeededCancelDelegationTokenRetrieved(long duration) {
totalSucceededCancelDelegationTokenRetrieved.add(duration);
cancelDelegationTokenLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -1454,4 +1535,16 @@ public void incrGetRMNodeLabelsFailedRetrieved() {
public void incrCheckUserAccessToQueueFailedRetrieved() {
numCheckUserAccessToQueueFailedRetrieved.incr();
}
public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}
public void incrRenewDelegationTokenFailedRetrieved() {
numRenewDelegationTokenFailedRetrieved.incr();
}
public void incrCancelDelegationTokenFailedRetrieved() {
numCancelDelegationTokenFailedRetrieved.incr();
}
}

View File

@ -24,10 +24,12 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,6 +38,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.EnumSet;
import java.io.IOException;
/**
@ -470,6 +473,27 @@ public static void validateContainerId(String containerId)
}
}
public static boolean isAllowedDelegationTokenOp() throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
return EnumSet.of(UserGroupInformation.AuthenticationMethod.KERBEROS,
UserGroupInformation.AuthenticationMethod.KERBEROS_SSL,
UserGroupInformation.AuthenticationMethod.CERTIFICATE)
.contains(UserGroupInformation.getCurrentUser()
.getRealAuthenticationMethod());
} else {
return true;
}
}
public static String getRenewerForToken(Token<RMDelegationTokenIdentifier> token)
throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
// we can always renew our own tokens
return loginUser.getUserName().equals(user.getUserName())
? token.decodeIdentifier().getRenewer().toString() : user.getShortUserName();
}
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {

View File

@ -20,6 +20,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
@ -40,7 +41,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
@ -118,9 +118,13 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
@ -136,6 +140,7 @@
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -1392,19 +1397,103 @@ public GetContainersResponse getContainers(GetContainersRequest request)
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getRenewer() == null) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing getDelegationToken request or Renewer.", null);
}
try {
// Verify that the connection is kerberos authenticated
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be issued only with kerberos authentication.");
}
long startTime = clock.getTime();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Text owner = new Text(ugi.getUserName());
Text realUser = null;
if (ugi.getRealUser() != null) {
realUser = new Text(ugi.getRealUser().getUserName());
}
RMDelegationTokenIdentifier tokenIdentifier =
new RMDelegationTokenIdentifier(owner, new Text(request.getRenewer()), realUser);
Token<RMDelegationTokenIdentifier> realRMDToken =
new Token<>(tokenIdentifier, this.getTokenSecretManager());
org.apache.hadoop.yarn.api.records.Token routerRMDTToken =
BuilderUtils.newDelegationToken(realRMDToken.getIdentifier(),
realRMDToken.getKind().toString(),
realRMDToken.getPassword(), realRMDToken.getService().toString());
long stopTime = clock.getTime();
routerMetrics.succeededGetDelegationTokenRetrieved((stopTime - startTime));
return GetDelegationTokenResponse.newInstance(routerRMDTToken);
} catch(IOException e) {
routerMetrics.incrGetDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}
long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = RouterServerUtil.getRenewerForToken(token);
long nextExpTime = this.getTokenSecretManager().renewToken(token, user);
RenewDelegationTokenResponse renewResponse =
Records.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
long stopTime = clock.getTime();
routerMetrics.succeededRenewDelegationTokenRetrieved((stopTime - startTime));
return renewResponse;
} catch (IOException e) {
routerMetrics.incrRenewDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
try {
if (!RouterServerUtil.isAllowedDelegationTokenOp()) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
long startTime = clock.getTime();
org.apache.hadoop.yarn.api.records.Token protoToken = request.getDelegationToken();
Token<RMDelegationTokenIdentifier> token = new Token<>(
protoToken.getIdentifier().array(), protoToken.getPassword().array(),
new Text(protoToken.getKind()), new Text(protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getUserName();
this.getTokenSecretManager().cancelToken(token, user);
long stopTime = clock.getTime();
routerMetrics.succeededCancelDelegationTokenRetrieved((stopTime - startTime));
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
routerMetrics.incrCancelDelegationTokenFailedRetrieved();
throw new YarnException(e);
}
}
@Override

View File

@ -519,10 +519,20 @@ public void getRMNodeLabelsFailed() {
metrics.incrGetRMNodeLabelsFailedRetrieved();
}
public void getCheckUserAccessToQueueRetrieved() {
LOG.info("Mocked: failed checkUserAccessToQueueRetrieved call");
public void getCheckUserAccessToQueueFailed() {
LOG.info("Mocked: failed checkUserAccessToQueue call");
metrics.incrCheckUserAccessToQueueFailedRetrieved();
}
public void getDelegationTokenFailed() {
LOG.info("Mocked: failed getDelegationToken call");
metrics.incrGetDelegationTokenFailedRetrieved();
}
public void getRenewDelegationTokenFailed() {
LOG.info("Mocked: failed renewDelegationToken call");
metrics.incrRenewDelegationTokenFailedRetrieved();
}
}
// Records successes for all calls
@ -743,6 +753,16 @@ public void getCheckUserAccessToQueueRetrieved(long duration) {
LOG.info("Mocked: successful CheckUserAccessToQueue call with duration {}", duration);
metrics.succeededCheckUserAccessToQueueRetrieved(duration);
}
public void getGetDelegationTokenRetrieved(long duration) {
LOG.info("Mocked: successful GetDelegationToken call with duration {}", duration);
metrics.succeededGetDelegationTokenRetrieved(duration);
}
public void getRenewDelegationTokenRetrieved(long duration) {
LOG.info("Mocked: successful RenewDelegationToken call with duration {}", duration);
metrics.succeededRenewDelegationTokenRetrieved(duration);
}
}
@Test
@ -1510,16 +1530,16 @@ public void testGetRMNodeLabelsRetrievedFailed() {
}
@Test
public void testCheckUserAccessToQueueRetrievedRetrieved() {
long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved();
public void testCheckUserAccessToQueueRetrieved() {
long totalGoodBefore = metrics.getNumSucceededCheckUserAccessToQueueRetrieved();
goodSubCluster.getCheckUserAccessToQueueRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
metrics.getNumSucceededCheckUserAccessToQueueRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getCheckUserAccessToQueueRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededCheckUserAccessToQueueRetrievedRetrieved());
metrics.getNumSucceededCheckUserAccessToQueueRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededCheckUserAccessToQueueRetrieved(), ASSERT_DOUBLE_DELTA);
}
@ -1527,8 +1547,54 @@ public void testCheckUserAccessToQueueRetrievedRetrieved() {
@Test
public void testCheckUserAccessToQueueRetrievedFailed() {
long totalBadBefore = metrics.getCheckUserAccessToQueueFailedRetrieved();
badSubCluster.getCheckUserAccessToQueueRetrieved();
badSubCluster.getCheckUserAccessToQueueFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getCheckUserAccessToQueueFailedRetrieved());
}
@Test
public void testGetDelegationTokenRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetDelegationTokenRetrieved();
goodSubCluster.getGetDelegationTokenRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetDelegationTokenRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getGetDelegationTokenRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetDelegationTokenRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetDelegationTokenRetrievedFailed() {
long totalBadBefore = metrics.getDelegationTokenFailedRetrieved();
badSubCluster.getDelegationTokenFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getDelegationTokenFailedRetrieved());
}
@Test
public void testRenewDelegationTokenRetrieved() {
long totalGoodBefore = metrics.getNumSucceededRenewDelegationTokenRetrieved();
goodSubCluster.getRenewDelegationTokenRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededRenewDelegationTokenRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededRenewDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getRenewDelegationTokenRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededRenewDelegationTokenRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededRenewDelegationTokenRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testRenewDelegationTokenRetrievedFailed() {
long totalBadBefore = metrics.getRenewDelegationTokenFailedRetrieved();
badSubCluster.getRenewDelegationTokenFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getRenewDelegationTokenFailedRetrieved());
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.router.clientrm;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@ -32,6 +33,7 @@
import java.util.Arrays;
import java.util.Collection;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
@ -100,6 +102,12 @@
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -123,10 +131,13 @@
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -138,6 +149,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
@ -170,7 +184,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private final static long DEFAULT_DURATION = 10 * 60 * 1000;
@Override
public void setUp() {
public void setUp() throws IOException {
super.setUpConfig();
interceptor = new TestableFederationClientInterceptor();
@ -181,6 +195,11 @@ public void setUp() {
interceptor.setConf(this.getConf());
interceptor.init(user);
RouterDelegationTokenSecretManager tokenSecretManager =
interceptor.createRouterRMDelegationTokenSecretManager(this.getConf());
tokenSecretManager.startThreads();
interceptor.setTokenSecretManager(tokenSecretManager);
subClusters = new ArrayList<>();
@ -230,6 +249,7 @@ protected YarnConfiguration createConfiguration() {
conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024);
conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100);
conf.setBoolean("hadoop.security.authentication", true);
return conf;
}
@ -1550,4 +1570,138 @@ public void testGetNumMaxThreads() {
int minThreads2 = interceptor.getNumMaxThreads(this.getConf());
Assert.assertEquals(8, minThreads2);
}
@Test
public void testGetDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the GetDelegationToken method is as expected.
//
// 1. Apply for a DelegationToken for renewer1,
// the Router returns the DelegationToken of the user, and the KIND of the token is
// RM_DELEGATION_TOKEN
//
// 2. We maintain the compatibility with RMDelegationTokenIdentifier,
// we can serialize the token into RMDelegationTokenIdentifier.
//
// 3. We can get the issueDate, and compare the data in the StateStore,
// the data should be consistent.
// Step1. We apply for DelegationToken for renewer1
// Both response & delegationToken cannot be empty
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer1");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
Assert.assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
Assert.assertNotNull(delegationToken);
Assert.assertEquals("RM_DELEGATION_TOKEN", delegationToken.getKind());
// Step2. Serialize the returned Token as RMDelegationTokenIdentifier.
org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier();
Assert.assertNotNull(rMDelegationTokenIdentifier);
// Step3. Verify the returned data of the token.
String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
long issueDate = rMDelegationTokenIdentifier.getIssueDate();
long maxDate = rMDelegationTokenIdentifier.getMaxDate();
Assert.assertEquals("renewer1", renewer);
long tokenMaxLifetime = this.getConf().getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
Assert.assertEquals(issueDate + tokenMaxLifetime, maxDate);
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
Assert.assertNotNull(managerState);
Map<RMDelegationTokenIdentifier, Long> delegationTokenState = managerState.getTokenState();
Assert.assertNotNull(delegationTokenState);
Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
long tokenRenewInterval = this.getConf().getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier);
Assert.assertEquals(issueDate + tokenRenewInterval, renewDate);
}
@Test
public void testRenewDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the GetDelegationToken method is as expected
// 1. Call GetDelegationToken to apply for delegationToken.
// 2. Call renewDelegationToken to refresh delegationToken.
// By looking at the code of AbstractDelegationTokenSecretManager#renewToken,
// we know that renewTime is calculated as Math.min(id.getMaxDate(), now + tokenRenewInterval)
// so renewTime will be less than or equal to maxDate.
// 3. We will compare whether the expirationTime returned to the
// client is consistent with the renewDate in the stateStore.
// Step1. Call GetDelegationToken to apply for delegationToken.
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer2");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
Assert.assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
org.apache.hadoop.security.token.Token<RMDelegationTokenIdentifier> token =
ConverterUtils.convertFromYarn(delegationToken, (Text) null);
RMDelegationTokenIdentifier rMDelegationTokenIdentifier = token.decodeIdentifier();
String renewer = rMDelegationTokenIdentifier.getRenewer().toString();
long maxDate = rMDelegationTokenIdentifier.getMaxDate();
Assert.assertEquals("renewer2", renewer);
// Step2. Call renewDelegationToken to refresh delegationToken.
RenewDelegationTokenRequest renewRequest = Records.newRecord(RenewDelegationTokenRequest.class);
renewRequest.setDelegationToken(delegationToken);
RenewDelegationTokenResponse renewResponse = interceptor.renewDelegationToken(renewRequest);
Assert.assertNotNull(renewResponse);
long expDate = renewResponse.getNextExpirationTime();
Assert.assertTrue(expDate <= maxDate);
// Step3. Compare whether the expirationTime returned to
// the client is consistent with the renewDate in the stateStore
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
Map<RMDelegationTokenIdentifier, Long> delegationTokenState = managerState.getTokenState();
Assert.assertNotNull(delegationTokenState);
Assert.assertTrue(delegationTokenState.containsKey(rMDelegationTokenIdentifier));
long renewDate = delegationTokenState.get(rMDelegationTokenIdentifier);
Assert.assertEquals(expDate, renewDate);
}
@Test
public void testCancelDelegationToken() throws IOException, YarnException {
// We design such a unit test to check
// that the execution of the CancelDelegationToken method is as expected
// 1. Call GetDelegationToken to apply for delegationToken.
// 2. Call CancelDelegationToken to cancel delegationToken.
// 3. Query the data in the StateStore and confirm that the Delegation has been deleted.
// Step1. Call GetDelegationToken to apply for delegationToken.
GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
when(request.getRenewer()).thenReturn("renewer3");
GetDelegationTokenResponse response = interceptor.getDelegationToken(request);
Assert.assertNotNull(response);
Token delegationToken = response.getRMDelegationToken();
// Step2. Call CancelDelegationToken to cancel delegationToken.
CancelDelegationTokenRequest cancelTokenRequest =
CancelDelegationTokenRequest.newInstance(delegationToken);
CancelDelegationTokenResponse cancelTokenResponse =
interceptor.cancelDelegationToken(cancelTokenRequest);
Assert.assertNotNull(cancelTokenResponse);
// Step3. Query the data in the StateStore and confirm that the Delegation has been deleted.
// At this point, the size of delegationTokenState should be 0.
RouterRMDTSecretManagerState managerState = stateStore.getRouterRMSecretManagerState();
Map<RMDelegationTokenIdentifier, Long> delegationTokenState = managerState.getTokenState();
Assert.assertNotNull(delegationTokenState);
Assert.assertEquals(0, delegationTokenState.size());
}
}

View File

@ -28,8 +28,10 @@
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@ -38,6 +40,7 @@
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -51,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.Assert;
import org.slf4j.Logger;
@ -216,4 +220,28 @@ public void shutdown() {
mockRMs.clear();
super.shutdown();
}
public RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager(
Configuration conf) {
long secretKeyInterval = conf.getLong(
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
long tokenMaxLifetime = conf.getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
long tokenRenewInterval = conf.getLong(
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
long removeScanInterval = conf.getTimeDuration(
YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,
YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
return new RouterDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, removeScanInterval);
}
}