From 0926fa5a2c822fc8baa94f08323893321859838e Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 3 Jan 2023 18:14:02 +0800 Subject: [PATCH] YARN-11225. [Federation] Add postDelegationToken postDelegationTokenExpiration cancelDelegationToken REST APIs for Router. (#5185) --- .../clientrm/RouterClientRMService.java | 12 + .../RouterDelegationTokenSecretManager.java | 10 + .../AbstractRESTRequestInterceptor.java | 13 ++ .../webapp/FederationInterceptorREST.java | 221 +++++++++++++++++- .../router/webapp/RESTRequestInterceptor.java | 15 ++ .../router/webapp/RouterWebServiceUtil.java | 114 +++++++++ .../router/webapp/RouterWebServices.java | 7 + .../TestableFederationClientInterceptor.java | 15 +- .../webapp/BaseRouterWebServicesTest.java | 1 + .../webapp/TestFederationInterceptorREST.java | 220 ++++++++++++++++- 10 files changed, 602 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index 0fb38b2edd..e3e84079b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -611,4 +611,16 @@ protected RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecret public RouterDelegationTokenSecretManager getRouterDTSecretManager() { return routerDTSecretManager; } + + @VisibleForTesting + public void setRouterDTSecretManager(RouterDelegationTokenSecretManager routerDTSecretManager) { + this.routerDTSecretManager = routerDTSecretManager; + } + + @VisibleForTesting + public void initUserPipelineMap(Configuration conf) { + int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); + this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java index 79790cde30..918bf16e4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/security/RouterDelegationTokenSecretManager.java @@ -252,6 +252,16 @@ public synchronized Map getAllTokens() { return allTokens; } + public long getRenewDate(RMDelegationTokenIdentifier ident) + throws InvalidToken { + DelegationTokenInformation info = currentTokens.get(ident); + if (info == null) { + throw new InvalidToken("token (" + ident.toString() + + ") can't be found in cache"); + } + return info.getRenewDate(); + } + @Override protected synchronized int incrementDelegationTokenSeqNum() { return federationFacade.incrementDelegationTokenSeqNum(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java index baf931ac46..ad79addfca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/AbstractRESTRequestInterceptor.java @@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; + import org.apache.hadoop.yarn.server.router.RouterServerUtil; /** @@ -32,6 +34,7 @@ public abstract class AbstractRESTRequestInterceptor private Configuration conf; private RESTRequestInterceptor nextInterceptor; private UserGroupInformation user = null; + private RouterClientRMService routerClientRMService = null; /** * Sets the {@link RESTRequestInterceptor} in the chain. @@ -93,4 +96,14 @@ public RESTRequestInterceptor getNextInterceptor() { public UserGroupInformation getUser() { return user; } + + @Override + public RouterClientRMService getRouterClientRMService() { + return routerClientRMService; + } + + @Override + public void setRouterClientRMService(RouterClientRMService routerClientRMService) { + this.routerClientRMService = routerClientRMService; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 93e7a16cd9..f48ead04a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.Method; import java.security.Principal; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -46,11 +47,20 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Sets; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -61,6 +71,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; @@ -107,8 +118,11 @@ import org.apache.hadoop.yarn.server.router.RouterMetrics; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; @@ -124,6 +138,9 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.extractToken; +import static org.apache.hadoop.yarn.server.router.webapp.RouterWebServiceUtil.getKerberosUserGroupInformation; + /** * Extends the {@code AbstractRESTRequestInterceptor} class and provides an * implementation for federation of YARN RM and scaling an application across @@ -1567,25 +1584,209 @@ public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr, throw new RuntimeException("updateAppQueue Failed."); } + /** + * This method posts a delegation token from the client. + * + * @param tokenData the token to delegate. It is a content param. + * @param hsr the servlet request. + * @return Response containing the status code. + * @throws AuthorizationException if Kerberos auth failed. + * @throws IOException if the delegation failed. + * @throws InterruptedException if interrupted. + * @throws Exception in case of bad request. + */ @Override - public Response postDelegationToken(DelegationToken tokenData, - HttpServletRequest hsr) throws AuthorizationException, IOException, - InterruptedException, Exception { - throw new NotImplementedException("Code is not implemented"); + public Response postDelegationToken(DelegationToken tokenData, HttpServletRequest hsr) + throws AuthorizationException, IOException, InterruptedException, Exception { + + if (tokenData == null || hsr == null) { + throw new IllegalArgumentException("Parameter error, the tokenData or hsr is null."); + } + + try { + // get Caller UserGroupInformation + Configuration conf = federationFacade.getConf(); + UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr); + + // create a delegation token + return createDelegationToken(tokenData, callerUGI); + } catch (YarnException e) { + LOG.error("Create delegation token request failed.", e); + return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build(); + } } + /** + * Create DelegationToken. + * + * @param dtoken DelegationToken Data. + * @param callerUGI UserGroupInformation. + * @return Response. + * @throws Exception An exception occurred when creating a delegationToken. + */ + private Response createDelegationToken(DelegationToken dtoken, UserGroupInformation callerUGI) + throws IOException, InterruptedException { + + String renewer = dtoken.getRenewer(); + + GetDelegationTokenResponse resp = callerUGI.doAs( + (PrivilegedExceptionAction) () -> { + GetDelegationTokenRequest createReq = GetDelegationTokenRequest.newInstance(renewer); + return this.getRouterClientRMService().getDelegationToken(createReq); + }); + + DelegationToken respToken = getDelegationToken(renewer, resp); + return Response.status(Status.OK).entity(respToken).build(); + } + + /** + * Get DelegationToken. + * + * @param renewer renewer. + * @param resp GetDelegationTokenResponse. + * @return DelegationToken. + * @throws IOException if there are I/O errors. + */ + private DelegationToken getDelegationToken(String renewer, GetDelegationTokenResponse resp) + throws IOException { + // Step1. Parse token from GetDelegationTokenResponse. + Token tk = getToken(resp); + String tokenKind = tk.getKind().toString(); + RMDelegationTokenIdentifier tokenIdentifier = tk.decodeIdentifier(); + String owner = tokenIdentifier.getOwner().toString(); + long maxDate = tokenIdentifier.getMaxDate(); + + // Step2. Call the interface to get the expiration time of Token. + RouterClientRMService clientRMService = this.getRouterClientRMService(); + RouterDelegationTokenSecretManager tokenSecretManager = + clientRMService.getRouterDTSecretManager(); + long currentExpiration = tokenSecretManager.getRenewDate(tokenIdentifier); + + // Step3. Generate Delegation token. + DelegationToken delegationToken = new DelegationToken(tk.encodeToUrlString(), + renewer, owner, tokenKind, currentExpiration, maxDate); + + return delegationToken; + } + + /** + * GetToken. + * We convert RMDelegationToken in GetDelegationTokenResponse to Token. + * + * @param resp GetDelegationTokenResponse. + * @return Token. + */ + private static Token getToken(GetDelegationTokenResponse resp) { + org.apache.hadoop.yarn.api.records.Token token = resp.getRMDelegationToken(); + byte[] identifier = token.getIdentifier().array(); + byte[] password = token.getPassword().array(); + Text kind = new Text(token.getKind()); + Text service = new Text(token.getService()); + Token tk = new Token<>(identifier, password, kind, service); + return tk; + } + + /** + * This method updates the expiration for a delegation token from the client. + * + * @param hsr the servlet request + * @return Response containing the status code. + * @throws AuthorizationException if Kerberos auth failed. + * @throws IOException if the delegation failed. + * @throws InterruptedException if interrupted. + * @throws Exception in case of bad request. + */ @Override public Response postDelegationTokenExpiration(HttpServletRequest hsr) - throws AuthorizationException, IOException, InterruptedException, - Exception { - throw new NotImplementedException("Code is not implemented"); + throws AuthorizationException, IOException, InterruptedException, Exception { + + if (hsr == null) { + throw new IllegalArgumentException("Parameter error, the hsr is null."); + } + + try { + // get Caller UserGroupInformation + Configuration conf = federationFacade.getConf(); + UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr); + return renewDelegationToken(hsr, callerUGI); + } catch (YarnException e) { + LOG.error("Renew delegation token request failed.", e); + return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build(); + } } + /** + * Renew DelegationToken. + * + * @param hsr HttpServletRequest. + * @param callerUGI UserGroupInformation. + * @return Response + * @throws IOException if there are I/O errors. + * @throws InterruptedException if any thread has interrupted. + */ + private Response renewDelegationToken(HttpServletRequest hsr, UserGroupInformation callerUGI) + throws IOException, InterruptedException { + + // renew Delegation Token + DelegationToken tokenData = new DelegationToken(); + String encodeToken = extractToken(hsr).encodeToUrlString(); + tokenData.setToken(encodeToken); + + // Parse token data + Token token = extractToken(tokenData.getToken()); + org.apache.hadoop.yarn.api.records.Token dToken = + BuilderUtils.newDelegationToken(token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + + // Renew token + RenewDelegationTokenResponse resp = callerUGI.doAs( + (PrivilegedExceptionAction) () -> { + RenewDelegationTokenRequest req = RenewDelegationTokenRequest.newInstance(dToken); + return this.getRouterClientRMService().renewDelegationToken(req); + }); + + // return DelegationToken + long renewTime = resp.getNextExpirationTime(); + DelegationToken respToken = new DelegationToken(); + respToken.setNextExpirationTime(renewTime); + return Response.status(Status.OK).entity(respToken).build(); + } + + /** + * Cancel DelegationToken. + * + * @param hsr the servlet request + * @return Response containing the status code. + * @throws AuthorizationException if Kerberos auth failed. + * @throws IOException if the delegation failed. + * @throws InterruptedException if interrupted. + * @throws Exception in case of bad request. + */ @Override public Response cancelDelegationToken(HttpServletRequest hsr) - throws AuthorizationException, IOException, InterruptedException, - Exception { - throw new NotImplementedException("Code is not implemented"); + throws AuthorizationException, IOException, InterruptedException, Exception { + try { + // get Caller UserGroupInformation + Configuration conf = federationFacade.getConf(); + UserGroupInformation callerUGI = getKerberosUserGroupInformation(conf, hsr); + + // parse Token Data + Token token = extractToken(hsr); + org.apache.hadoop.yarn.api.records.Token dToken = BuilderUtils + .newDelegationToken(token.getIdentifier(), token.getKind().toString(), + token.getPassword(), token.getService().toString()); + + // cancelDelegationToken + callerUGI.doAs((PrivilegedExceptionAction) () -> { + CancelDelegationTokenRequest req = CancelDelegationTokenRequest.newInstance(dToken); + return this.getRouterClientRMService().cancelDelegationToken(req); + }); + + return Response.status(Status.OK).build(); + } catch (YarnException e) { + LOG.error("Cancel delegation token request failed.", e); + return Response.status(Status.FORBIDDEN).entity(e.getMessage()).build(); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java index 917809ad6c..2724cdd5ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RESTRequestInterceptor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; @@ -122,4 +123,18 @@ ContainersInfo getContainers(HttpServletRequest req, HttpServletResponse res, */ ContainerInfo getContainer(HttpServletRequest req, HttpServletResponse res, String appId, String appAttemptId, String containerId); + + /** + * Set RouterClientRMService. + * + * @param routerClientRMService routerClientRMService. + */ + void setRouterClientRMService(RouterClientRMService routerClientRMService); + + /** + * Get RouterClientRMService. + * + * @return RouterClientRMService + */ + RouterClientRMService getRouterClientRMService(); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java index 4182a12f30..e33ce15507 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java @@ -20,6 +20,7 @@ import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; import static javax.servlet.http.HttpServletResponse.SC_OK; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DELEGATION_TOKEN_HEADER; import java.io.IOException; import java.net.InetSocketAddress; @@ -43,11 +44,18 @@ import javax.ws.rs.core.Response.ResponseBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; +import org.apache.hadoop.security.authorize.AuthorizationException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; @@ -614,4 +622,110 @@ private static NodeLabelInfo mergeNodeLabelInfo(NodeLabelInfo left, NodeLabelInf resultNodeLabelInfo.setPartitionInfo(newPartitionInfo); return resultNodeLabelInfo; } + + /** + * initForWritableEndpoints does the init and acls verification for all + * writable REST end points. + * + * @param conf Configuration. + * @param callerUGI remote caller who initiated the request. + * @throws AuthorizationException in case of no access to perfom this op. + */ + public static void initForWritableEndpoints(Configuration conf, UserGroupInformation callerUGI) + throws AuthorizationException { + if (callerUGI == null) { + String msg = "Unable to obtain user name, user not authenticated"; + throw new AuthorizationException(msg); + } + + if (UserGroupInformation.isSecurityEnabled() && isStaticUser(conf, callerUGI)) { + String msg = "The default static user cannot carry out this operation."; + throw new ForbiddenException(msg); + } + } + + /** + * Determine whether the user is a static user. + * + * @param conf Configuration. + * @param callerUGI remote caller who initiated the request. + * @return true, static user; false, not static user; + */ + private static boolean isStaticUser(Configuration conf, UserGroupInformation callerUGI) { + String staticUser = conf.get(CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER, + CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER); + return staticUser.equals(callerUGI.getUserName()); + } + + public static void createKerberosUserGroupInformation(HttpServletRequest hsr) + throws YarnException { + String authType = hsr.getAuthType(); + + if (!KerberosAuthenticationHandler.TYPE.equalsIgnoreCase(authType)) { + String msg = "Delegation token operations can only be carried out on a " + + "Kerberos authenticated channel. Expected auth type is " + + KerberosAuthenticationHandler.TYPE + ", got type " + authType; + throw new YarnException(msg); + } + + Object ugiAttr = + hsr.getAttribute(DelegationTokenAuthenticationHandler.DELEGATION_TOKEN_UGI_ATTRIBUTE); + if (ugiAttr != null) { + String msg = "Delegation token operations cannot be carried out using " + + "delegation token authentication."; + throw new YarnException(msg); + } + } + + /** + * Parse Token data. + * + * @param encodedToken tokenData + * @return RMDelegationTokenIdentifier. + */ + public static Token extractToken(String encodedToken) { + Token token = new Token<>(); + try { + token.decodeFromUrlString(encodedToken); + } catch (Exception ie) { + throw new BadRequestException("Could not decode encoded token"); + } + return token; + } + + public static Token extractToken(HttpServletRequest request) { + String encodedToken = request.getHeader(DELEGATION_TOKEN_HEADER); + if (encodedToken == null) { + String msg = "Header '" + DELEGATION_TOKEN_HEADER + + "' containing encoded token not found"; + throw new BadRequestException(msg); + } + return extractToken(encodedToken); + } + + /** + * Get Kerberos UserGroupInformation. + * + * Parse ugi from hsr and set kerberos authentication attributes. + * + * @param conf Configuration. + * @param request the servlet request. + * @return UserGroupInformation. + * @throws AuthorizationException if Kerberos auth failed. + * @throws YarnException If Authentication Type verification fails. + */ + public static UserGroupInformation getKerberosUserGroupInformation(Configuration conf, + HttpServletRequest request) throws AuthorizationException, YarnException { + // Parse ugi from hsr And Check ugi as expected. + // If ugi is empty or user is a static user, an exception will be thrown. + UserGroupInformation callerUGI = RMWebAppUtil.getCallerUserGroupInformation(request, true); + initForWritableEndpoints(conf, callerUGI); + + // Set AuthenticationMethod Kerberos for ugi. + createKerberosUserGroupInformation(request); + callerUGI.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS); + + // return caller UGI + return callerUGI; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 02f545beda..c9c56c46c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.router.Router; import org.apache.hadoop.yarn.server.router.RouterServerUtil; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -208,6 +209,8 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { RESTRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); + RouterClientRMService routerClientRMService = router.getClientRMProxyService(); + interceptorChain.setRouterClientRMService(routerClientRMService); chainWrapper.init(interceptorChain); } catch (Exception e) { LOG.error("Init RESTRequestInterceptor error for user: {}", user, e); @@ -954,4 +957,8 @@ public NodeLabelsInfo getRMNodeLabels(@Context HttpServletRequest hsr) RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getRMNodeLabels(hsr); } + + public Router getRouter() { + return router; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index c8c647a0d2..fa25bc4d0a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -224,17 +224,20 @@ public void shutdown() { public RouterDelegationTokenSecretManager createRouterRMDelegationTokenSecretManager( Configuration conf) { - long secretKeyInterval = conf.getLong( + long secretKeyInterval = conf.getTimeDuration( YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, - YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); - long tokenMaxLifetime = conf.getLong( + long tokenMaxLifetime = conf.getTimeDuration( YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, - YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT, + TimeUnit.MILLISECONDS); - long tokenRenewInterval = conf.getLong( + long tokenRenewInterval = conf.getTimeDuration( YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, - YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); long removeScanInterval = conf.getTimeDuration( YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 9b086e5103..423e0e5a38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -86,6 +86,7 @@ public abstract class BaseRouterWebServicesTest { @Before public void setUp() throws YarnException, IOException { + this.conf = createConfiguration(); router = spy(new Router()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java index 14533d1087..910dbeb62d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.HashSet; import java.util.Collections; +import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; @@ -92,7 +93,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken; import org.apache.hadoop.yarn.server.resourcemanager.webapp.NodeIDsInfo; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService; +import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService.RequestInterceptorChainWrapper; +import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor; +import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; @@ -108,12 +114,23 @@ import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.apache.hadoop.yarn.util.MonotonicClock; import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.BadRequestException; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY; + import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.DURATION; import static org.apache.hadoop.yarn.server.router.webapp.MockDefaultRequestInterceptorREST.NUM_CONTAINERS; import static org.mockito.Mockito.mock; @@ -137,9 +154,10 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest { private MemoryFederationStateStore stateStore; private FederationStateStoreTestUtil stateStoreUtil; private List subClusters; + private static final String TEST_RENEWER = "test-renewer"; - @Override public void setUp() throws YarnException, IOException { + super.setUpConfig(); interceptor = new TestableFederationInterceptorREST(); @@ -154,17 +172,38 @@ public void setUp() throws YarnException, IOException { subClusters = new ArrayList<>(); - try { - for (int i = 0; i < NUM_SUBCLUSTER; i++) { - SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); - stateStoreUtil.registerSubCluster(sc); - subClusters.add(sc); - } - } catch (YarnException e) { - LOG.error(e.getMessage()); - Assert.fail(); + for (int i = 0; i < NUM_SUBCLUSTER; i++) { + SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); + stateStoreUtil.registerSubCluster(sc); + subClusters.add(sc); } + RouterClientRMService routerClientRMService = new RouterClientRMService(); + routerClientRMService.initUserPipelineMap(getConf()); + long secretKeyInterval = this.getConf().getLong( + RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY, RM_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); + long tokenMaxLifetime = this.getConf().getLong( + RM_DELEGATION_TOKEN_MAX_LIFETIME_KEY, RM_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT); + long tokenRenewInterval = this.getConf().getLong( + RM_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, RM_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); + long removeScanInterval = this.getConf().getTimeDuration( + RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_KEY, + RM_DELEGATION_TOKEN_REMOVE_SCAN_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); + RouterDelegationTokenSecretManager tokenSecretManager = new RouterDelegationTokenSecretManager( + secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, removeScanInterval); + tokenSecretManager.startThreads(); + routerClientRMService.setRouterDTSecretManager(tokenSecretManager); + + TestableFederationClientInterceptor clientInterceptor = + new TestableFederationClientInterceptor(); + clientInterceptor.setConf(this.getConf()); + clientInterceptor.init(TEST_RENEWER); + clientInterceptor.setTokenSecretManager(tokenSecretManager); + RequestInterceptorChainWrapper wrapper = new RequestInterceptorChainWrapper(); + wrapper.init(clientInterceptor); + routerClientRMService.getUserPipelineMap().put(TEST_RENEWER, wrapper); + interceptor.setRouterClientRMService(routerClientRMService); + for (SubClusterId subCluster : subClusters) { SubClusterInfo subClusterInfo = stateStoreUtil.querySubClusterInfo(subCluster); interceptor.getOrCreateInterceptorForSubCluster( @@ -172,6 +211,7 @@ public void setUp() throws YarnException, IOException { } interceptor.setupResourceManager(); + } @Override @@ -1485,4 +1525,164 @@ public void testCheckFederationInterceptorRESTClient() { Assert.assertNotNull(interceptorREST.getClient()); Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress()); } + + @Test + public void testPostDelegationTokenErrorHsr() throws Exception { + // Prepare delegationToken data + DelegationToken token = new DelegationToken(); + token.setRenewer(TEST_RENEWER); + + HttpServletRequest request = mock(HttpServletRequest.class); + + // If we don't set token + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the tokenData or hsr is null.", + () -> interceptor.postDelegationToken(null, request)); + + // If we don't set hsr + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the tokenData or hsr is null.", + () -> interceptor.postDelegationToken(token, null)); + + // If we don't set renewUser, we will get error message. + LambdaTestUtils.intercept(AuthorizationException.class, + "Unable to obtain user name, user not authenticated", + () -> interceptor.postDelegationToken(token, request)); + + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(TEST_RENEWER); + when(request.getRemoteUser()).thenReturn(TEST_RENEWER); + when(request.getUserPrincipal()).thenReturn(principal); + + // If we don't set the authentication type, we will get error message. + Response response = interceptor.postDelegationToken(token, request); + Assert.assertNotNull(response); + Assert.assertEquals(response.getStatus(), Status.FORBIDDEN.getStatusCode()); + String errMsg = "Delegation token operations can only be carried out on a " + + "Kerberos authenticated channel. Expected auth type is kerberos, got type null"; + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof String); + String entityMsg = String.valueOf(entity); + Assert.assertTrue(errMsg.contains(entityMsg)); + } + + @Test + public void testPostDelegationToken() throws Exception { + Long now = Time.now(); + + DelegationToken token = new DelegationToken(); + token.setRenewer(TEST_RENEWER); + + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(TEST_RENEWER); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(TEST_RENEWER); + when(request.getUserPrincipal()).thenReturn(principal); + when(request.getAuthType()).thenReturn("kerberos"); + + Response response = interceptor.postDelegationToken(token, request); + Assert.assertNotNull(response); + + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof DelegationToken); + + DelegationToken dtoken = DelegationToken.class.cast(entity); + Assert.assertEquals(TEST_RENEWER, dtoken.getRenewer()); + Assert.assertEquals(TEST_RENEWER, dtoken.getOwner()); + Assert.assertEquals("RM_DELEGATION_TOKEN", dtoken.getKind()); + Assert.assertNotNull(dtoken.getToken()); + Assert.assertTrue(dtoken.getNextExpirationTime() > now); + } + + @Test + public void testPostDelegationTokenExpirationError() throws Exception { + + // If we don't set hsr + LambdaTestUtils.intercept(IllegalArgumentException.class, + "Parameter error, the hsr is null.", + () -> interceptor.postDelegationTokenExpiration(null)); + + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(TEST_RENEWER); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(TEST_RENEWER); + when(request.getUserPrincipal()).thenReturn(principal); + when(request.getAuthType()).thenReturn("kerberos"); + + // If we don't set the header. + String errorMsg = "Header 'Hadoop-YARN-RM-Delegation-Token' containing encoded token not found"; + LambdaTestUtils.intercept(BadRequestException.class, errorMsg, + () -> interceptor.postDelegationTokenExpiration(request)); + } + + @Test + public void testPostDelegationTokenExpiration() throws Exception { + + DelegationToken token = new DelegationToken(); + token.setRenewer(TEST_RENEWER); + + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(TEST_RENEWER); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(TEST_RENEWER); + when(request.getUserPrincipal()).thenReturn(principal); + when(request.getAuthType()).thenReturn("kerberos"); + + Response response = interceptor.postDelegationToken(token, request); + Assert.assertNotNull(response); + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof DelegationToken); + DelegationToken dtoken = DelegationToken.class.cast(entity); + + final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token"; + when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken()); + + Response renewResponse = interceptor.postDelegationTokenExpiration(request); + Assert.assertNotNull(renewResponse); + + Object renewEntity = renewResponse.getEntity(); + Assert.assertNotNull(renewEntity); + Assert.assertTrue(renewEntity instanceof DelegationToken); + + // renewDelegation, we only return renewDate, other values are NULL. + DelegationToken renewDToken = DelegationToken.class.cast(renewEntity); + Assert.assertNull(renewDToken.getRenewer()); + Assert.assertNull(renewDToken.getOwner()); + Assert.assertNull(renewDToken.getKind()); + Assert.assertTrue(renewDToken.getNextExpirationTime() > dtoken.getNextExpirationTime()); + } + + @Test + public void testCancelDelegationToken() throws Exception { + DelegationToken token = new DelegationToken(); + token.setRenewer(TEST_RENEWER); + + Principal principal = mock(Principal.class); + when(principal.getName()).thenReturn(TEST_RENEWER); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRemoteUser()).thenReturn(TEST_RENEWER); + when(request.getUserPrincipal()).thenReturn(principal); + when(request.getAuthType()).thenReturn("kerberos"); + + Response response = interceptor.postDelegationToken(token, request); + Assert.assertNotNull(response); + Object entity = response.getEntity(); + Assert.assertNotNull(entity); + Assert.assertTrue(entity instanceof DelegationToken); + DelegationToken dtoken = DelegationToken.class.cast(entity); + + final String yarnTokenHeader = "Hadoop-YARN-RM-Delegation-Token"; + when(request.getHeader(yarnTokenHeader)).thenReturn(dtoken.getToken()); + + Response cancelResponse = interceptor.cancelDelegationToken(request); + Assert.assertNotNull(cancelResponse); + Assert.assertEquals(response.getStatus(), Status.OK.getStatusCode()); + } } \ No newline at end of file