HDFS-15447 RBF: Add top real owners metrics for delegation tokens (#2110)
This commit is contained in:
parent
3e70006639
commit
84b74b335c
@ -22,9 +22,12 @@
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -34,6 +37,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
||||
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.HadoopKerberosName;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
@ -65,6 +70,12 @@ private String formatTokenId(TokenIdent id) {
|
||||
protected final Map<TokenIdent, DelegationTokenInformation> currentTokens
|
||||
= new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Map of token real owners to its token count. This is used to generate
|
||||
* metrics of top users by owned tokens.
|
||||
*/
|
||||
protected final Map<String, Long> tokenOwnerStats = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Sequence number to create DelegationTokenIdentifier.
|
||||
* Protected by this object lock.
|
||||
@ -292,6 +303,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
|
||||
protected void storeToken(TokenIdent ident,
|
||||
DelegationTokenInformation tokenInfo) throws IOException {
|
||||
currentTokens.put(ident, tokenInfo);
|
||||
addTokenForOwnerStats(ident);
|
||||
storeNewToken(ident, tokenInfo.getRenewDate());
|
||||
}
|
||||
|
||||
@ -339,6 +351,7 @@ public synchronized void addPersistedDelegationToken(
|
||||
if (getTokenInfo(identifier) == null) {
|
||||
currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
|
||||
password, getTrackingIdIfEnabled(identifier)));
|
||||
addTokenForOwnerStats(identifier);
|
||||
} else {
|
||||
throw new IOException("Same delegation token being added twice: "
|
||||
+ formatTokenId(identifier));
|
||||
@ -578,6 +591,7 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
||||
if (info == null) {
|
||||
throw new InvalidToken("Token not found " + formatTokenId(id));
|
||||
}
|
||||
removeTokenForOwnerStats(id);
|
||||
removeStoredToken(id);
|
||||
return id;
|
||||
}
|
||||
@ -634,6 +648,7 @@ private void removeExpiredToken() throws IOException {
|
||||
long renewDate = entry.getValue().getRenewDate();
|
||||
if (renewDate < now) {
|
||||
expiredTokens.add(entry.getKey());
|
||||
removeTokenForOwnerStats(entry.getKey());
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
@ -726,4 +741,88 @@ public TokenIdent decodeTokenIdentifier(Token<TokenIdent> token) throws IOExcept
|
||||
return token.decodeIdentifier();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return top token real owners list as well as the tokens count.
|
||||
*
|
||||
* @param n top number of users
|
||||
* @return map of owners to counts
|
||||
*/
|
||||
public List<NameValuePair> getTopTokenRealOwners(int n) {
|
||||
n = Math.min(n, tokenOwnerStats.size());
|
||||
if (n == 0) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
TopN topN = new TopN(n);
|
||||
for (Map.Entry<String, Long> entry : tokenOwnerStats.entrySet()) {
|
||||
topN.offer(new NameValuePair(
|
||||
entry.getKey(), entry.getValue()));
|
||||
}
|
||||
|
||||
List<NameValuePair> list = new ArrayList<>();
|
||||
while (!topN.isEmpty()) {
|
||||
list.add(topN.poll());
|
||||
}
|
||||
Collections.reverse(list);
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the real owner for a token. If this is a token from a proxy user,
|
||||
* the real/effective user will be returned.
|
||||
*
|
||||
* @param id
|
||||
* @return real owner
|
||||
*/
|
||||
private String getTokenRealOwner(TokenIdent id) {
|
||||
String realUser;
|
||||
if (id.getRealUser() != null && !id.getRealUser().toString().isEmpty()) {
|
||||
realUser = id.getRealUser().toString();
|
||||
} else {
|
||||
// if there is no real user -> this is a non proxy user
|
||||
// the user itself is the real owner
|
||||
realUser = id.getUser().getUserName();
|
||||
}
|
||||
return realUser;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add token stats to the owner to token count mapping.
|
||||
*
|
||||
* @param id
|
||||
*/
|
||||
private void addTokenForOwnerStats(TokenIdent id) {
|
||||
String realOwner = getTokenRealOwner(id);
|
||||
tokenOwnerStats.put(realOwner,
|
||||
tokenOwnerStats.getOrDefault(realOwner, 0L)+1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove token stats to the owner to token count mapping.
|
||||
*
|
||||
* @param id
|
||||
*/
|
||||
private void removeTokenForOwnerStats(TokenIdent id) {
|
||||
String realOwner = getTokenRealOwner(id);
|
||||
if (tokenOwnerStats.containsKey(realOwner)) {
|
||||
// unlikely to be less than 1 but in case
|
||||
if (tokenOwnerStats.get(realOwner) <= 1) {
|
||||
tokenOwnerStats.remove(realOwner);
|
||||
} else {
|
||||
tokenOwnerStats.put(realOwner, tokenOwnerStats.get(realOwner)-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method syncs token information from currentTokens to tokenOwnerStats.
|
||||
* It is used when the currentTokens is initialized or refreshed. This is
|
||||
* called from a single thread thus no synchronization is needed.
|
||||
*/
|
||||
protected void syncTokenOwnerStats() {
|
||||
tokenOwnerStats.clear();
|
||||
for (TokenIdent id : currentTokens.keySet()) {
|
||||
addTokenForOwnerStats(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,6 +457,9 @@ private void loadFromZKCache(final boolean isTokenCache) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
if (isTokenCache) {
|
||||
syncTokenOwnerStats();
|
||||
}
|
||||
if (count > 0) {
|
||||
LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName);
|
||||
}
|
||||
|
@ -124,7 +124,8 @@ public class RBFMetrics implements RouterMBean, FederationMBean {
|
||||
private MountTableStore mountTableStore;
|
||||
/** Router state store. */
|
||||
private RouterStore routerStore;
|
||||
|
||||
/** The number of top token owners reported in metrics. */
|
||||
private int topTokenRealOwners;
|
||||
|
||||
public RBFMetrics(Router router) throws IOException {
|
||||
this.router = router;
|
||||
@ -166,7 +167,9 @@ public RBFMetrics(Router router) throws IOException {
|
||||
Configuration conf = router.getConfig();
|
||||
this.timeOut = conf.getTimeDuration(RBFConfigKeys.DN_REPORT_TIME_OUT,
|
||||
RBFConfigKeys.DN_REPORT_TIME_OUT_MS_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
|
||||
this.topTokenRealOwners = conf.getInt(
|
||||
RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY,
|
||||
RBFConfigKeys.DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -649,6 +652,17 @@ public long getCurrentTokensCount() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTopTokenRealOwners() {
|
||||
RouterSecurityManager mgr =
|
||||
this.router.getRpcServer().getRouterSecurityManager();
|
||||
if (mgr != null && mgr.getSecretManager() != null) {
|
||||
return JSON.toString(mgr.getSecretManager()
|
||||
.getTopTokenRealOwners(this.topTokenRealOwners));
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSecurityEnabled() {
|
||||
return UserGroupInformation.isSecurityEnabled();
|
||||
|
@ -101,4 +101,11 @@ public interface RouterMBean {
|
||||
* @return true, if security is enabled.
|
||||
*/
|
||||
boolean isSecurityEnabled();
|
||||
|
||||
/**
|
||||
* Get the top delegation token owners(realUser).
|
||||
*
|
||||
* @return Json string of owners to token counts
|
||||
*/
|
||||
String getTopTokenRealOwners();
|
||||
}
|
||||
|
@ -79,6 +79,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
||||
public static final Class<? extends RouterRpcMonitor>
|
||||
DFS_ROUTER_METRICS_CLASS_DEFAULT =
|
||||
FederationRPCPerformanceMonitor.class;
|
||||
public static final String DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY =
|
||||
FEDERATION_ROUTER_PREFIX + "top.num.token.realowners";
|
||||
public static final int
|
||||
DFS_ROUTER_METRICS_TOP_NUM_TOKEN_OWNERS_KEY_DEFAULT = 10;
|
||||
|
||||
// HDFS Router heartbeat
|
||||
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
|
||||
|
@ -30,8 +30,6 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@ -197,6 +195,7 @@ private void rebuildTokenCache(boolean initial) throws IOException {
|
||||
}
|
||||
}
|
||||
}
|
||||
syncTokenOwnerStats();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -657,4 +657,14 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.top.num.token.realowners</name>
|
||||
<value>10</value>
|
||||
<description>
|
||||
The number of top real owners by tokens count to report in the JMX metrics.
|
||||
Real owners are the effective users whose cretential are used to generate
|
||||
the tokens.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
</configuration>
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.router.Router;
|
||||
import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
@ -50,6 +51,7 @@
|
||||
|
||||
import org.hamcrest.core.StringContains;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -124,6 +126,72 @@ public void testDelegationTokens() throws IOException {
|
||||
securityManager.renewDelegationToken(token);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelgationTokenTopOwners() throws Exception {
|
||||
UserGroupInformation.reset();
|
||||
List<NameValuePair> topOwners;
|
||||
|
||||
UserGroupInformation user = UserGroupInformation
|
||||
.createUserForTesting("abc", new String[]{"router_group"});
|
||||
UserGroupInformation.setLoginUser(user);
|
||||
Token dt = securityManager.getDelegationToken(new Text("abc"));
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(1, topOwners.size());
|
||||
assertEquals("abc", topOwners.get(0).getName());
|
||||
assertEquals(1, topOwners.get(0).getValue());
|
||||
|
||||
securityManager.renewDelegationToken(dt);
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(1, topOwners.size());
|
||||
assertEquals("abc", topOwners.get(0).getName());
|
||||
assertEquals(1, topOwners.get(0).getValue());
|
||||
|
||||
securityManager.cancelDelegationToken(dt);
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(0, topOwners.size());
|
||||
|
||||
|
||||
// Use proxy user - the code should use the proxy user as the real owner
|
||||
UserGroupInformation routerUser =
|
||||
UserGroupInformation.createRemoteUser("router");
|
||||
UserGroupInformation proxyUser = UserGroupInformation
|
||||
.createProxyUserForTesting("abc",
|
||||
routerUser,
|
||||
new String[]{"router_group"});
|
||||
UserGroupInformation.setLoginUser(proxyUser);
|
||||
|
||||
Token proxyDT = securityManager.getDelegationToken(new Text("router"));
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(1, topOwners.size());
|
||||
assertEquals("router", topOwners.get(0).getName());
|
||||
assertEquals(1, topOwners.get(0).getValue());
|
||||
|
||||
// router to renew tokens
|
||||
UserGroupInformation.setLoginUser(routerUser);
|
||||
securityManager.renewDelegationToken(proxyDT);
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(1, topOwners.size());
|
||||
assertEquals("router", topOwners.get(0).getName());
|
||||
assertEquals(1, topOwners.get(0).getValue());
|
||||
|
||||
securityManager.cancelDelegationToken(proxyDT);
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(0, topOwners.size());
|
||||
|
||||
|
||||
// check rank by more users
|
||||
securityManager.getDelegationToken(new Text("router"));
|
||||
securityManager.getDelegationToken(new Text("router"));
|
||||
UserGroupInformation.setLoginUser(user);
|
||||
securityManager.getDelegationToken(new Text("router"));
|
||||
topOwners = securityManager.getSecretManager().getTopTokenRealOwners(2);
|
||||
assertEquals(2, topOwners.size());
|
||||
assertEquals("router", topOwners.get(0).getName());
|
||||
assertEquals(2, topOwners.get(0).getValue());
|
||||
assertEquals("abc", topOwners.get(1).getName());
|
||||
assertEquals(1, topOwners.get(1).getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVerifyToken() throws IOException {
|
||||
UserGroupInformation.reset();
|
||||
|
Loading…
Reference in New Issue
Block a user