YARN-11295. [Federation] Router Support DelegationToken in MemoryStore mode. (#5032)
This commit is contained in:
parent
c4aa41aa80
commit
d93e6f0cbb
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -44,6 +45,8 @@ public abstract class AbstractClientRequestInterceptor
|
|||||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||||
protected UserGroupInformation user = null;
|
protected UserGroupInformation user = null;
|
||||||
|
|
||||||
|
private RouterDelegationTokenSecretManager tokenSecretManager = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the {@link ClientRequestInterceptor} in the chain.
|
* Sets the {@link ClientRequestInterceptor} in the chain.
|
||||||
*/
|
*/
|
||||||
@ -125,4 +128,13 @@ private void setupUser(String userName) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RouterDelegationTokenSecretManager getTokenSecretManager() {
|
||||||
|
return tokenSecretManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager) {
|
||||||
|
this.tokenSecretManager = tokenSecretManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the contract to be implemented by the request interceptor classes,
|
* Defines the contract to be implemented by the request interceptor classes,
|
||||||
@ -62,4 +63,18 @@ public interface ClientRequestInterceptor
|
|||||||
*/
|
*/
|
||||||
ClientRequestInterceptor getNextInterceptor();
|
ClientRequestInterceptor getNextInterceptor();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set RouterDelegationTokenSecretManager for specific interceptor to support Token operations,
|
||||||
|
* including create Token, update Token, and delete Token.
|
||||||
|
*
|
||||||
|
* @param tokenSecretManager Router DelegationTokenSecretManager
|
||||||
|
*/
|
||||||
|
void setTokenSecretManager(RouterDelegationTokenSecretManager tokenSecretManager);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get RouterDelegationTokenSecretManager.
|
||||||
|
*
|
||||||
|
* @return Router DelegationTokenSecretManager.
|
||||||
|
*/
|
||||||
|
RouterDelegationTokenSecretManager getTokenSecretManager();
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -105,6 +106,7 @@
|
|||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -136,6 +138,8 @@ public class RouterClientRMService extends AbstractService
|
|||||||
// and remove the oldest used ones.
|
// and remove the oldest used ones.
|
||||||
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
|
private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
|
||||||
|
|
||||||
|
private RouterDelegationTokenSecretManager routerDTSecretManager;
|
||||||
|
|
||||||
public RouterClientRMService() {
|
public RouterClientRMService() {
|
||||||
super(RouterClientRMService.class.getName());
|
super(RouterClientRMService.class.getName());
|
||||||
}
|
}
|
||||||
@ -164,8 +168,12 @@ protected void serviceStart() throws Exception {
|
|||||||
serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
|
serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
|
||||||
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);
|
YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);
|
||||||
|
|
||||||
|
// Initialize RouterRMDelegationTokenSecretManager.
|
||||||
|
routerDTSecretManager = createRouterRMDelegationTokenSecretManager(conf);
|
||||||
|
routerDTSecretManager.startThreads();
|
||||||
|
|
||||||
this.server = rpc.getServer(ApplicationClientProtocol.class, this,
|
this.server = rpc.getServer(ApplicationClientProtocol.class, this,
|
||||||
listenerEndpoint, serverConf, null, numWorkerThreads);
|
listenerEndpoint, serverConf, routerDTSecretManager, numWorkerThreads);
|
||||||
|
|
||||||
// Enable service authorization?
|
// Enable service authorization?
|
||||||
if (conf.getBoolean(
|
if (conf.getBoolean(
|
||||||
@ -508,6 +516,13 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
|
|||||||
ClientRequestInterceptor interceptorChain =
|
ClientRequestInterceptor interceptorChain =
|
||||||
this.createRequestInterceptorChain();
|
this.createRequestInterceptorChain();
|
||||||
interceptorChain.init(user);
|
interceptorChain.init(user);
|
||||||
|
|
||||||
|
// We set the RouterDelegationTokenSecretManager instance to the interceptorChain
|
||||||
|
// and let the interceptor use it.
|
||||||
|
if (routerDTSecretManager != null) {
|
||||||
|
interceptorChain.setTokenSecretManager(routerDTSecretManager);
|
||||||
|
}
|
||||||
|
|
||||||
chainWrapper.init(interceptorChain);
|
chainWrapper.init(interceptorChain);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
|
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
|
||||||
@ -558,4 +573,42 @@ protected void finalize() {
|
|||||||
public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
|
public Map<String, RequestInterceptorChainWrapper> getUserPipelineMap() {
|
||||||
return userPipelineMap;
|
return userPipelineMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create RouterRMDelegationTokenSecretManager.
|
||||||
|
* In the YARN federation, the Router will replace the RM to
|
||||||
|
* manage the RMDelegationToken (generate, update, cancel),
|
||||||
|
* so the relevant configuration parameters still obtain the configuration parameters of the RM.
|
||||||
|
*
|
||||||
|
* @param conf Configuration
|
||||||
|
* @return RouterDelegationTokenSecretManager.
|
||||||
|
*/
|
||||||
|
protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager(
|
||||||
|
Configuration conf) {
|
||||||
|
|
||||||
|
long secretKeyInterval = conf.getLong(
|
||||||
|
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY,
|
||||||
|
YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
|
||||||
|
|
||||||
|
long tokenMaxLifetime = conf.getLong(
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY,
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
|
||||||
|
|
||||||
|
long tokenRenewInterval = conf.getLong(
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
|
||||||
|
|
||||||
|
long removeScanInterval = conf.getTimeDuration(
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY,
|
||||||
|
YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT,
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
return new RouterDelegationTokenSecretManager(secretKeyInterval,
|
||||||
|
tokenMaxLifetime, tokenRenewInterval, removeScanInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public RouterDelegationTokenSecretManager getRouterDTSecretManager() {
|
||||||
|
return routerDTSecretManager;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,254 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.router.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Router specific delegation token secret manager.
|
||||||
|
* The secret manager is responsible for generating and accepting the password
|
||||||
|
* for each token.
|
||||||
|
*/
|
||||||
|
public class RouterDelegationTokenSecretManager
|
||||||
|
extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(RouterDelegationTokenSecretManager.class);
|
||||||
|
|
||||||
|
private FederationStateStoreFacade federationFacade;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a Router Secret manager.
|
||||||
|
*
|
||||||
|
* @param delegationKeyUpdateInterval the number of milliseconds for rolling
|
||||||
|
* new secret keys.
|
||||||
|
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation
|
||||||
|
* tokens in milliseconds
|
||||||
|
* @param delegationTokenRenewInterval how often the tokens must be renewed
|
||||||
|
* in milliseconds
|
||||||
|
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
|
||||||
|
*/
|
||||||
|
public RouterDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||||
|
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
|
||||||
|
long delegationTokenRemoverScanInterval) {
|
||||||
|
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
|
||||||
|
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
||||||
|
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMDelegationTokenIdentifier createIdentifier() {
|
||||||
|
return new RMDelegationTokenIdentifier();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean shouldIgnoreException(Exception e) {
|
||||||
|
return !running && e.getCause() instanceof InterruptedException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router Supports Store the New Master Key.
|
||||||
|
* During this Process, Facade will call the specific StateStore to store the MasterKey.
|
||||||
|
*
|
||||||
|
* @param newKey DelegationKey
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void storeNewMasterKey(DelegationKey newKey) {
|
||||||
|
try {
|
||||||
|
federationFacade.storeNewMasterKey(newKey);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!shouldIgnoreException(e)) {
|
||||||
|
LOG.error("Error in storing master key with KeyID: {}.", newKey.getKeyId());
|
||||||
|
ExitUtil.terminate(1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router Supports Remove the master key.
|
||||||
|
* During this Process, Facade will call the specific StateStore to remove the MasterKey.
|
||||||
|
*
|
||||||
|
* @param delegationKey DelegationKey
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeStoredMasterKey(DelegationKey delegationKey) {
|
||||||
|
try {
|
||||||
|
federationFacade.removeStoredMasterKey(delegationKey);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!shouldIgnoreException(e)) {
|
||||||
|
LOG.error("Error in removing master key with KeyID: {}.", delegationKey.getKeyId());
|
||||||
|
ExitUtil.terminate(1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router Supports Store new Token.
|
||||||
|
*
|
||||||
|
* @param identifier RMDelegationToken
|
||||||
|
* @param renewDate renewDate
|
||||||
|
* @throws IOException IO exception occurred.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void storeNewToken(RMDelegationTokenIdentifier identifier,
|
||||||
|
long renewDate) throws IOException {
|
||||||
|
try {
|
||||||
|
federationFacade.storeNewToken(identifier, renewDate);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!shouldIgnoreException(e)) {
|
||||||
|
LOG.error("Error in storing RMDelegationToken with sequence number: {}.",
|
||||||
|
identifier.getSequenceNumber());
|
||||||
|
ExitUtil.terminate(1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router Supports Update Token.
|
||||||
|
*
|
||||||
|
* @param id RMDelegationToken
|
||||||
|
* @param renewDate renewDate
|
||||||
|
* @throws IOException IO exception occurred
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void updateStoredToken(RMDelegationTokenIdentifier id, long renewDate) throws IOException {
|
||||||
|
try {
|
||||||
|
federationFacade.updateStoredToken(id, renewDate);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!shouldIgnoreException(e)) {
|
||||||
|
LOG.error("Error in updating persisted RMDelegationToken with sequence number: {}.",
|
||||||
|
id.getSequenceNumber());
|
||||||
|
ExitUtil.terminate(1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router Supports Remove Token.
|
||||||
|
*
|
||||||
|
* @param identifier Delegation Token
|
||||||
|
* @throws IOException IO exception occurred.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeStoredToken(RMDelegationTokenIdentifier identifier) throws IOException {
|
||||||
|
try {
|
||||||
|
federationFacade.removeStoredToken(identifier);
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (!shouldIgnoreException(e)) {
|
||||||
|
LOG.error("Error in removing RMDelegationToken with sequence number: {}",
|
||||||
|
identifier.getSequenceNumber());
|
||||||
|
ExitUtil.terminate(1, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Router supports obtaining the DelegationKey stored in the Router StateStote
|
||||||
|
* according to the DelegationKey.
|
||||||
|
*
|
||||||
|
* @param key Param DelegationKey
|
||||||
|
* @return Delegation Token
|
||||||
|
* @throws YarnException An internal conversion error occurred when getting the Token
|
||||||
|
* @throws IOException IO exception occurred
|
||||||
|
*/
|
||||||
|
public DelegationKey getMasterKeyByDelegationKey(DelegationKey key)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
try {
|
||||||
|
RouterMasterKeyResponse response = federationFacade.getMasterKeyByDelegationKey(key);
|
||||||
|
RouterMasterKey masterKey = response.getRouterMasterKey();
|
||||||
|
ByteBuffer keyByteBuf = masterKey.getKeyBytes();
|
||||||
|
byte[] keyBytes = new byte[keyByteBuf.remaining()];
|
||||||
|
keyByteBuf.get(keyBytes);
|
||||||
|
DelegationKey delegationKey =
|
||||||
|
new DelegationKey(masterKey.getKeyId(), masterKey.getExpiryDate(), keyBytes);
|
||||||
|
return delegationKey;
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new IOException(ex);
|
||||||
|
} catch (YarnException ex) {
|
||||||
|
throw new YarnException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get RMDelegationTokenIdentifier according to RouterStoreToken.
|
||||||
|
*
|
||||||
|
* @param identifier RMDelegationTokenIdentifier
|
||||||
|
* @return RMDelegationTokenIdentifier
|
||||||
|
* @throws YarnException An internal conversion error occurred when getting the Token
|
||||||
|
* @throws IOException IO exception occurred
|
||||||
|
*/
|
||||||
|
public RMDelegationTokenIdentifier getTokenByRouterStoreToken(
|
||||||
|
RMDelegationTokenIdentifier identifier) throws YarnException, IOException {
|
||||||
|
try {
|
||||||
|
RouterRMTokenResponse response = federationFacade.getTokenByRouterStoreToken(identifier);
|
||||||
|
YARNDelegationTokenIdentifier responseIdentifier =
|
||||||
|
response.getRouterStoreToken().getTokenIdentifier();
|
||||||
|
return (RMDelegationTokenIdentifier) responseIdentifier;
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new YarnException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFederationFacade(FederationStateStoreFacade federationFacade) {
|
||||||
|
this.federationFacade = federationFacade;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getLatestDTSequenceNumber() {
|
||||||
|
return delegationTokenSequenceNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized Set<DelegationKey> getAllMasterKeys() {
|
||||||
|
return new HashSet<>(allKeys.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Public
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
|
||||||
|
Map<RMDelegationTokenIdentifier, Long> allTokens = new HashMap<>();
|
||||||
|
for (Map.Entry<RMDelegationTokenIdentifier,
|
||||||
|
DelegationTokenInformation> entry : currentTokens.entrySet()) {
|
||||||
|
RMDelegationTokenIdentifier keyIdentifier = entry.getKey();
|
||||||
|
DelegationTokenInformation tokenInformation = entry.getValue();
|
||||||
|
allTokens.put(keyIdentifier, tokenInformation.getRenewDate());
|
||||||
|
}
|
||||||
|
return allTokens;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.security;
|
@ -24,7 +24,9 @@
|
|||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
|
||||||
import org.apache.hadoop.yarn.server.router.Router;
|
import org.apache.hadoop.yarn.server.router.Router;
|
||||||
@ -179,6 +181,9 @@ public static File createKeytab(String principal, String filename) throws Except
|
|||||||
*/
|
*/
|
||||||
public synchronized void startSecureRouter() {
|
public synchronized void startSecureRouter() {
|
||||||
assertNull("Router is already running", router);
|
assertNull("Router is already running", router);
|
||||||
|
MemoryFederationStateStore stateStore = new MemoryFederationStateStore();
|
||||||
|
stateStore.init(getConf());
|
||||||
|
FederationStateStoreFacade.getInstance().reinitialize(stateStore, getConf());
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
router = new Router();
|
router = new Router();
|
||||||
router.init(conf);
|
router.init(conf);
|
||||||
@ -238,4 +243,7 @@ public static ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
|
|||||||
return mockRMs;
|
return mockRMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,201 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.router.secure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
|
||||||
|
import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
|
|
||||||
|
public class TestRouterDelegationTokenSecretManager extends AbstractSecureRouterTest {
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestRouterDelegationTokenSecretManager.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterStoreNewMasterKey() throws Exception {
|
||||||
|
LOG.info("Test RouterDelegationTokenSecretManager: StoreNewMasterKey.");
|
||||||
|
|
||||||
|
// Start the Router in Secure Mode
|
||||||
|
startSecureRouter();
|
||||||
|
|
||||||
|
// Store NewMasterKey
|
||||||
|
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
|
||||||
|
RouterDelegationTokenSecretManager secretManager =
|
||||||
|
routerClientRMService.getRouterDTSecretManager();
|
||||||
|
DelegationKey storeKey = new DelegationKey(1234, 4321, "keyBytes".getBytes());
|
||||||
|
secretManager.storeNewMasterKey(storeKey);
|
||||||
|
|
||||||
|
// Get DelegationKey
|
||||||
|
DelegationKey paramKey = new DelegationKey(1234, 4321, "keyBytes".getBytes());
|
||||||
|
DelegationKey responseKey = secretManager.getMasterKeyByDelegationKey(paramKey);
|
||||||
|
|
||||||
|
assertNotNull(paramKey);
|
||||||
|
assertEquals(storeKey.getExpiryDate(), responseKey.getExpiryDate());
|
||||||
|
assertEquals(storeKey.getKeyId(), responseKey.getKeyId());
|
||||||
|
assertArrayEquals(storeKey.getEncodedKey(), responseKey.getEncodedKey());
|
||||||
|
assertEquals(storeKey, responseKey);
|
||||||
|
|
||||||
|
stopSecureRouter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterRemoveStoredMasterKey() throws Exception {
|
||||||
|
LOG.info("Test RouterDelegationTokenSecretManager: RemoveStoredMasterKey.");
|
||||||
|
|
||||||
|
// Start the Router in Secure Mode
|
||||||
|
startSecureRouter();
|
||||||
|
|
||||||
|
// Store NewMasterKey
|
||||||
|
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
|
||||||
|
RouterDelegationTokenSecretManager secretManager =
|
||||||
|
routerClientRMService.getRouterDTSecretManager();
|
||||||
|
DelegationKey storeKey = new DelegationKey(1234, 4321, "keyBytes".getBytes());
|
||||||
|
secretManager.storeNewMasterKey(storeKey);
|
||||||
|
|
||||||
|
// Remove DelegationKey
|
||||||
|
secretManager.removeStoredMasterKey(storeKey);
|
||||||
|
|
||||||
|
// Get DelegationKey
|
||||||
|
DelegationKey paramKey = new DelegationKey(1234, 4321, "keyBytes".getBytes());
|
||||||
|
LambdaTestUtils.intercept(IOException.class,
|
||||||
|
"GetMasterKey with keyID: " + storeKey.getKeyId() + " does not exist.",
|
||||||
|
() -> secretManager.getMasterKeyByDelegationKey(paramKey));
|
||||||
|
|
||||||
|
stopSecureRouter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterStoreNewToken() throws Exception {
|
||||||
|
LOG.info("Test RouterDelegationTokenSecretManager: StoreNewToken.");
|
||||||
|
|
||||||
|
// Start the Router in Secure Mode
|
||||||
|
startSecureRouter();
|
||||||
|
|
||||||
|
// Store new rm-token
|
||||||
|
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
|
||||||
|
RouterDelegationTokenSecretManager secretManager =
|
||||||
|
routerClientRMService.getRouterDTSecretManager();
|
||||||
|
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
|
||||||
|
int sequenceNumber = 1;
|
||||||
|
dtId1.setSequenceNumber(sequenceNumber);
|
||||||
|
Long renewDate1 = Time.now();
|
||||||
|
secretManager.storeNewToken(dtId1, renewDate1);
|
||||||
|
|
||||||
|
// query rm-token
|
||||||
|
RMDelegationTokenIdentifier dtId2 = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
|
||||||
|
dtId2.setSequenceNumber(sequenceNumber);
|
||||||
|
RMDelegationTokenIdentifier dtId3 = secretManager.getTokenByRouterStoreToken(dtId2);
|
||||||
|
Assert.assertEquals(dtId1, dtId3);
|
||||||
|
|
||||||
|
// query rm-token2 not exists
|
||||||
|
sequenceNumber++;
|
||||||
|
dtId2.setSequenceNumber(2);
|
||||||
|
LambdaTestUtils.intercept(YarnException.class,
|
||||||
|
"RMDelegationToken: " + dtId2 + " does not exist.",
|
||||||
|
() -> secretManager.getTokenByRouterStoreToken(dtId2));
|
||||||
|
|
||||||
|
stopSecureRouter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterUpdateNewToken() throws Exception {
|
||||||
|
LOG.info("Test RouterDelegationTokenSecretManager: UpdateNewToken.");
|
||||||
|
|
||||||
|
// Start the Router in Secure Mode
|
||||||
|
startSecureRouter();
|
||||||
|
|
||||||
|
// Store new rm-token
|
||||||
|
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
|
||||||
|
RouterDelegationTokenSecretManager secretManager =
|
||||||
|
routerClientRMService.getRouterDTSecretManager();
|
||||||
|
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
|
||||||
|
int sequenceNumber = 1;
|
||||||
|
dtId1.setSequenceNumber(sequenceNumber);
|
||||||
|
Long renewDate1 = Time.now();
|
||||||
|
secretManager.storeNewToken(dtId1, renewDate1);
|
||||||
|
|
||||||
|
sequenceNumber++;
|
||||||
|
dtId1.setSequenceNumber(sequenceNumber);
|
||||||
|
secretManager.updateStoredToken(dtId1, renewDate1);
|
||||||
|
|
||||||
|
// query rm-token
|
||||||
|
RMDelegationTokenIdentifier dtId2 = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
|
||||||
|
dtId2.setSequenceNumber(sequenceNumber);
|
||||||
|
RMDelegationTokenIdentifier dtId3 = secretManager.getTokenByRouterStoreToken(dtId2);
|
||||||
|
assertNotNull(dtId3);
|
||||||
|
assertEquals(dtId1.getKind(), dtId3.getKind());
|
||||||
|
assertEquals(dtId1.getOwner(), dtId3.getOwner());
|
||||||
|
assertEquals(dtId1.getRealUser(), dtId3.getRealUser());
|
||||||
|
assertEquals(dtId1.getRenewer(), dtId3.getRenewer());
|
||||||
|
assertEquals(dtId1.getIssueDate(), dtId3.getIssueDate());
|
||||||
|
assertEquals(dtId1.getMasterKeyId(), dtId3.getMasterKeyId());
|
||||||
|
assertEquals(dtId1.getSequenceNumber(), dtId3.getSequenceNumber());
|
||||||
|
assertEquals(sequenceNumber, dtId3.getSequenceNumber());
|
||||||
|
assertEquals(dtId1, dtId3);
|
||||||
|
|
||||||
|
stopSecureRouter();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRouterRemoveToken() throws Exception {
|
||||||
|
LOG.info("Test RouterDelegationTokenSecretManager: RouterRemoveToken.");
|
||||||
|
|
||||||
|
// Start the Router in Secure Mode
|
||||||
|
startSecureRouter();
|
||||||
|
|
||||||
|
// Store new rm-token
|
||||||
|
RouterClientRMService routerClientRMService = this.getRouter().getClientRMProxyService();
|
||||||
|
RouterDelegationTokenSecretManager secretManager =
|
||||||
|
routerClientRMService.getRouterDTSecretManager();
|
||||||
|
RMDelegationTokenIdentifier dtId1 = new RMDelegationTokenIdentifier(
|
||||||
|
new Text("owner1"), new Text("renewer1"), new Text("realuser1"));
|
||||||
|
int sequenceNumber = 1;
|
||||||
|
dtId1.setSequenceNumber(sequenceNumber);
|
||||||
|
Long renewDate1 = Time.now();
|
||||||
|
secretManager.storeNewToken(dtId1, renewDate1);
|
||||||
|
|
||||||
|
// Remove rm-token
|
||||||
|
secretManager.removeStoredToken(dtId1);
|
||||||
|
|
||||||
|
// query rm-token
|
||||||
|
LambdaTestUtils.intercept(YarnException.class,
|
||||||
|
"RMDelegationToken: " + dtId1 + " does not exist.",
|
||||||
|
() -> secretManager.getTokenByRouterStoreToken(dtId1));
|
||||||
|
|
||||||
|
stopSecureRouter();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user