From ffa9ed93a4f7323989fd4f829683adbabffa2371 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Wed, 10 Aug 2022 05:44:29 +0800 Subject: [PATCH] YARN-6572. Refactoring Router services to use common util classes for pipeline creations. (#4594) --- .../server/utils/YarnServerBuilderUtils.java | 2 +- .../yarn/server/router/RouterServerUtil.java | 77 ++++++++++++++++ .../clientrm/RouterClientRMService.java | 87 +++--------------- .../router/rmadmin/RouterRMAdminService.java | 86 +++--------------- .../router/webapp/RouterWebServices.java | 88 +++---------------- .../rmadmin/TestRouterRMAdminService.java | 3 +- 6 files changed, 116 insertions(+), 227 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java index 9ee68d12c7..6a5d22affa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/YarnServerBuilderUtils.java @@ -79,7 +79,7 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId, * * @param applicationId Application ID * @param credentials HDFS Tokens - * @return systemCredentialsForAppsProto SystemCredentialsForAppsProto + * @return systemCredentialsForAppsProto */ public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto( ApplicationId applicationId, ByteBuffer credentials) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 65f2b68bfc..36f02dd3e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -21,10 +21,19 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.io.IOException; /** @@ -84,6 +93,74 @@ public static void logAndThrowException(String errMsg, Throwable t) } } + public static R createRequestInterceptorChain(Configuration conf, String pipeLineClassName, + String interceptorClassName, Class clazz) { + + List interceptorClassNames = getInterceptorClassNames(conf, + pipeLineClassName, interceptorClassName); + + R pipeline = null; + R current = null; + + for (String className : interceptorClassNames) { + try { + Class interceptorClass = conf.getClassByName(className); + if (clazz.isAssignableFrom(interceptorClass)) { + Object interceptorInstance = ReflectionUtils.newInstance(interceptorClass, conf); + if (pipeline == null) { + pipeline = clazz.cast(interceptorInstance); + current = clazz.cast(interceptorInstance); + continue; + } else { + Method method = clazz.getMethod("setNextInterceptor", clazz); + method.invoke(current, interceptorInstance); + current = clazz.cast(interceptorInstance); + } + } else { + LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName()); + throw new YarnRuntimeException("Class: " + className + " not instance of " + + clazz.getCanonicalName()); + } + } catch (ClassNotFoundException e) { + LOG.error("Could not instantiate RequestInterceptor: {}", className, e); + throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e); + } catch (InvocationTargetException e) { + LOG.error("RequestInterceptor {} call setNextInterceptor error.", className, e); + throw new YarnRuntimeException("RequestInterceptor " + className + + " call setNextInterceptor error.", e); + } catch (NoSuchMethodException e) { + LOG.error("RequestInterceptor {} does not contain the method setNextInterceptor.", + className); + throw new YarnRuntimeException("RequestInterceptor " + className + + " does not contain the method setNextInterceptor.", e); + } catch (IllegalAccessException e) { + LOG.error("RequestInterceptor {} call the method setNextInterceptor " + + "does not have access.", className); + throw new YarnRuntimeException("RequestInterceptor " + + className + " call the method setNextInterceptor does not have access.", e); + } + } + + if (pipeline == null) { + throw new YarnRuntimeException( + "RequestInterceptor pipeline is not configured in the system."); + } + + return pipeline; + } + + private static List getInterceptorClassNames(Configuration conf, + String pipeLineClass, String interceptorClass) { + String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass); + List interceptorClassNames = new ArrayList<>(); + Collection tempList = + StringUtils.getStringCollection(configuredInterceptorClassNames); + for (String item : tempList) { + interceptorClassNames.add(item.trim()); + } + return interceptorClassNames; + } + /** * Throws an IOException due to an error. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java index acf5bfa8fb..fad68661d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java @@ -20,10 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -33,8 +30,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -108,8 +103,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.slf4j.Logger; @@ -147,7 +142,7 @@ public RouterClientRMService() { @Override protected void serviceStart() throws Exception { - LOG.info("Starting Router ClientRMService"); + LOG.info("Starting Router ClientRMService."); Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); UserGroupInformation.setConfiguration(conf); @@ -161,9 +156,7 @@ protected void serviceStart() throws Exception { int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); - this.userPipelineMap = Collections.synchronizedMap( - new LRUCacheHashMap( - maxCacheSize, true)); + this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true)); Configuration serverConf = new Configuration(conf); @@ -181,14 +174,13 @@ protected void serviceStart() throws Exception { } this.server.start(); - LOG.info("Router ClientRMService listening on address: " - + this.server.getListenerAddress()); + LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress()); super.serviceStart(); } @Override protected void serviceStop() throws Exception { - LOG.info("Stopping Router ClientRMService"); + LOG.info("Stopping Router ClientRMService."); if (this.server != null) { this.server.stop(); } @@ -201,27 +193,6 @@ public Server getServer() { return this.server; } - /** - * Returns the comma separated interceptor class names from the configuration. - * - * @param conf - * @return the interceptor class names as an instance of ArrayList - */ - private List getInterceptorClassNames(Configuration conf) { - String configuredInterceptorClassNames = - conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, - YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS); - - List interceptorClassNames = new ArrayList(); - Collection tempList = - StringUtils.getStringCollection(configuredInterceptorClassNames); - for (String item : tempList) { - interceptorClassNames.add(item.trim()); - } - - return interceptorClassNames; - } - @Override public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { @@ -507,44 +478,10 @@ protected Map getPipelines() { @VisibleForTesting protected ClientRequestInterceptor createRequestInterceptorChain() { Configuration conf = getConfig(); - - List interceptorClassNames = getInterceptorClassNames(conf); - - ClientRequestInterceptor pipeline = null; - ClientRequestInterceptor current = null; - for (String interceptorClassName : interceptorClassNames) { - try { - Class interceptorClass = conf.getClassByName(interceptorClassName); - if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) { - ClientRequestInterceptor interceptorInstance = - (ClientRequestInterceptor) ReflectionUtils - .newInstance(interceptorClass, conf); - if (pipeline == null) { - pipeline = interceptorInstance; - current = interceptorInstance; - continue; - } else { - current.setNextInterceptor(interceptorInstance); - current = interceptorInstance; - } - } else { - throw new YarnRuntimeException( - "Class: " + interceptorClassName + " not instance of " - + ClientRequestInterceptor.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException( - "Could not instantiate ApplicationClientRequestInterceptor: " - + interceptorClassName, - e); - } - } - - if (pipeline == null) { - throw new YarnRuntimeException( - "RequestInterceptor pipeline is not configured in the system"); - } - return pipeline; + return RouterServerUtil.createRequestInterceptorChain(conf, + YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS, + ClientRequestInterceptor.class); } /** @@ -565,15 +502,15 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { try { // We should init the pipeline instance after it is created and then // add to the map, to ensure thread safe. - LOG.info("Initializing request processing pipeline for application " - + "for the user: {}", user); + LOG.info("Initializing request processing pipeline for application for the user: {}.", + user); ClientRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - LOG.error("Init ClientRequestInterceptor error for user: " + user, e); + LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e); throw e; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index 3e23d6fcf9..56378d4d9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -20,10 +20,7 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -34,11 +31,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; @@ -69,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; import org.slf4j.Logger; @@ -106,7 +101,7 @@ public RouterRMAdminService() { @Override protected void serviceStart() throws Exception { - LOG.info("Starting Router RMAdmin Service"); + LOG.info("Starting Router RMAdmin Service."); Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); UserGroupInformation.setConfiguration(conf); @@ -120,9 +115,7 @@ protected void serviceStart() throws Exception { int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); - this.userPipelineMap = Collections.synchronizedMap( - new LRUCacheHashMap( - maxCacheSize, true)); + this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true)); Configuration serverConf = new Configuration(conf); @@ -139,14 +132,13 @@ protected void serviceStart() throws Exception { } this.server.start(); - LOG.info("Router RMAdminService listening on address: " - + this.server.getListenerAddress()); + LOG.info("Router RMAdminService listening on address: {}.", this.server.getListenerAddress()); super.serviceStart(); } @Override protected void serviceStop() throws Exception { - LOG.info("Stopping Router RMAdminService"); + LOG.info("Stopping Router RMAdminService."); if (this.server != null) { this.server.stop(); } @@ -164,27 +156,6 @@ public Server getServer() { return this.server; } - /** - * Returns the comma separated interceptor class names from the configuration. - * - * @param conf - * @return the interceptor class names as an instance of ArrayList - */ - private List getInterceptorClassNames(Configuration conf) { - String configuredInterceptorClassNames = - conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, - YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS); - - List interceptorClassNames = new ArrayList(); - Collection tempList = - StringUtils.getStringCollection(configuredInterceptorClassNames); - for (String item : tempList) { - interceptorClassNames.add(item.trim()); - } - - return interceptorClassNames; - } - @VisibleForTesting protected RequestInterceptorChainWrapper getInterceptorChain() throws IOException { @@ -215,45 +186,10 @@ protected Map getPipelines() { @VisibleForTesting protected RMAdminRequestInterceptor createRequestInterceptorChain() { Configuration conf = getConfig(); - - List interceptorClassNames = getInterceptorClassNames(conf); - - RMAdminRequestInterceptor pipeline = null; - RMAdminRequestInterceptor current = null; - for (String interceptorClassName : interceptorClassNames) { - try { - Class interceptorClass = conf.getClassByName(interceptorClassName); - if (RMAdminRequestInterceptor.class - .isAssignableFrom(interceptorClass)) { - RMAdminRequestInterceptor interceptorInstance = - (RMAdminRequestInterceptor) ReflectionUtils - .newInstance(interceptorClass, conf); - if (pipeline == null) { - pipeline = interceptorInstance; - current = interceptorInstance; - continue; - } else { - current.setNextInterceptor(interceptorInstance); - current = interceptorInstance; - } - } else { - throw new YarnRuntimeException( - "Class: " + interceptorClassName + " not instance of " - + RMAdminRequestInterceptor.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException( - "Could not instantiate RMAdminRequestInterceptor: " - + interceptorClassName, - e); - } - } - - if (pipeline == null) { - throw new YarnRuntimeException( - "RequestInterceptor pipeline is not configured in the system"); - } - return pipeline; + return RouterServerUtil.createRequestInterceptorChain(conf, + YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS, + RMAdminRequestInterceptor.class); } /** @@ -274,14 +210,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { try { // We should init the pipeline instance after it is created and then // add to the map, to ensure thread safe. - LOG.info("Initializing request processing pipeline for user: {}", user); + LOG.info("Initializing request processing pipeline for user: {}.", user); RMAdminRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e); + LOG.error("Init RMAdminRequestInterceptor error for user: {}.", user, e); throw e; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 8faf0619f3..b1dc8635b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -19,10 +19,7 @@ package org.apache.hadoop.yarn.server.router.webapp; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Set; @@ -48,11 +45,8 @@ import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; @@ -86,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo; import org.apache.hadoop.yarn.server.router.Router; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -136,30 +131,7 @@ public RouterWebServices(final Router router, Configuration conf) { int maxCacheSize = conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE); - this.userPipelineMap = Collections.synchronizedMap( - new LRUCacheHashMap( - maxCacheSize, true)); - } - - /** - * Returns the comma separated interceptor class names from the configuration. - * - * @param conf - * @return the interceptor class names as an instance of ArrayList - */ - private List getInterceptorClassNames(Configuration config) { - String configuredInterceptorClassNames = - config.get(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE, - YarnConfiguration.DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS); - - List interceptorClassNames = new ArrayList(); - Collection tempList = - StringUtils.getStringCollection(configuredInterceptorClassNames); - for (String item : tempList) { - interceptorClassNames.add(item.trim()); - } - - return interceptorClassNames; + this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true)); } private void init() { @@ -207,50 +179,16 @@ protected Map getPipelines() { */ @VisibleForTesting protected RESTRequestInterceptor createRequestInterceptorChain() { - - List interceptorClassNames = getInterceptorClassNames(conf); - - RESTRequestInterceptor pipeline = null; - RESTRequestInterceptor current = null; - for (String interceptorClassName : interceptorClassNames) { - try { - Class interceptorClass = conf.getClassByName(interceptorClassName); - if (RESTRequestInterceptor.class.isAssignableFrom(interceptorClass)) { - RESTRequestInterceptor interceptorInstance = - (RESTRequestInterceptor) ReflectionUtils - .newInstance(interceptorClass, conf); - if (pipeline == null) { - pipeline = interceptorInstance; - current = interceptorInstance; - continue; - } else { - current.setNextInterceptor(interceptorInstance); - current = interceptorInstance; - } - } else { - throw new YarnRuntimeException( - "Class: " + interceptorClassName + " not instance of " - + RESTRequestInterceptor.class.getCanonicalName()); - } - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException( - "Could not instantiate RESTRequestInterceptor: " - + interceptorClassName, - e); - } - } - - if (pipeline == null) { - throw new YarnRuntimeException( - "RequestInterceptor pipeline is not configured in the system"); - } - return pipeline; + return RouterServerUtil.createRequestInterceptorChain(conf, + YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE, + YarnConfiguration.DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS, + RESTRequestInterceptor.class); } /** * Initializes the request interceptor pipeline for the specified user. * - * @param user + * @param user specified user. */ private RequestInterceptorChainWrapper initializePipeline(String user) { synchronized (this.userPipelineMap) { @@ -265,14 +203,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) { try { // We should init the pipeline instance after it is created and then // add to the map, to ensure thread safe. - LOG.info("Initializing request processing pipeline for user: {}", user); + LOG.info("Initializing request processing pipeline for user: {}.", user); RESTRequestInterceptor interceptorChain = this.createRequestInterceptorChain(); interceptorChain.init(user); chainWrapper.init(interceptorChain); } catch (Exception e) { - LOG.error("Init RESTRequestInterceptor error for user: " + user, e); + LOG.error("Init RESTRequestInterceptor error for user: {}", user, e); throw e; } @@ -338,7 +276,7 @@ public ClusterInfo getClusterInfo() { @GET @Path(RMWSConsts.CLUSTER_USER_INFO) @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8, - MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) + MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override public ClusterUserInfo getClusterUserInfo(@Context HttpServletRequest hsr) { init(); @@ -834,10 +772,12 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext, @Override public Response listReservation( @QueryParam(RMWSConsts.QUEUE) @DefaultValue(DEFAULT_QUEUE) String queue, - @QueryParam(RMWSConsts.RESERVATION_ID) @DefaultValue(DEFAULT_RESERVATION_ID) String reservationId, + @QueryParam(RMWSConsts.RESERVATION_ID) + @DefaultValue(DEFAULT_RESERVATION_ID) String reservationId, @QueryParam(RMWSConsts.START_TIME) @DefaultValue(DEFAULT_START_TIME) long startTime, @QueryParam(RMWSConsts.END_TIME) @DefaultValue(DEFAULT_END_TIME) long endTime, - @QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, + @QueryParam(RMWSConsts.INCLUDE_RESOURCE) + @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations, @Context HttpServletRequest hsr) throws Exception { init(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java index 07ef73c3cd..867c71fa82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestRouterRMAdminService.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; @@ -185,7 +184,7 @@ public void testRouterRMAdminServiceE2E() throws Exception { */ @Test public void testUsersChainMapWithLRUCache() - throws YarnException, IOException, InterruptedException { + throws IOException, InterruptedException { Map pipelines; RequestInterceptorChainWrapper chain;