YARN-6572. Refactoring Router services to use common util classes for pipeline creations. (#4594)
This commit is contained in:
parent
92abd99450
commit
ffa9ed93a4
@ -79,7 +79,7 @@ public static NodeHeartbeatResponse newNodeHeartbeatResponse(int responseId,
|
||||
*
|
||||
* @param applicationId Application ID
|
||||
* @param credentials HDFS Tokens
|
||||
* @return systemCredentialsForAppsProto SystemCredentialsForAppsProto
|
||||
* @return systemCredentialsForAppsProto
|
||||
*/
|
||||
public static SystemCredentialsForAppsProto newSystemCredentialsForAppsProto(
|
||||
ApplicationId applicationId, ByteBuffer credentials) {
|
||||
|
@ -21,10 +21,19 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
@ -84,6 +93,74 @@ public static void logAndThrowException(String errMsg, Throwable t)
|
||||
}
|
||||
}
|
||||
|
||||
public static <R> R createRequestInterceptorChain(Configuration conf, String pipeLineClassName,
|
||||
String interceptorClassName, Class<R> clazz) {
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf,
|
||||
pipeLineClassName, interceptorClassName);
|
||||
|
||||
R pipeline = null;
|
||||
R current = null;
|
||||
|
||||
for (String className : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass = conf.getClassByName(className);
|
||||
if (clazz.isAssignableFrom(interceptorClass)) {
|
||||
Object interceptorInstance = ReflectionUtils.newInstance(interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = clazz.cast(interceptorInstance);
|
||||
current = clazz.cast(interceptorInstance);
|
||||
continue;
|
||||
} else {
|
||||
Method method = clazz.getMethod("setNextInterceptor", clazz);
|
||||
method.invoke(current, interceptorInstance);
|
||||
current = clazz.cast(interceptorInstance);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Class: {} not instance of {}.", className, clazz.getCanonicalName());
|
||||
throw new YarnRuntimeException("Class: " + className + " not instance of "
|
||||
+ clazz.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.error("Could not instantiate RequestInterceptor: {}", className, e);
|
||||
throw new YarnRuntimeException("Could not instantiate RequestInterceptor: " + className, e);
|
||||
} catch (InvocationTargetException e) {
|
||||
LOG.error("RequestInterceptor {} call setNextInterceptor error.", className, e);
|
||||
throw new YarnRuntimeException("RequestInterceptor " + className
|
||||
+ " call setNextInterceptor error.", e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.error("RequestInterceptor {} does not contain the method setNextInterceptor.",
|
||||
className);
|
||||
throw new YarnRuntimeException("RequestInterceptor " + className +
|
||||
" does not contain the method setNextInterceptor.", e);
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.error("RequestInterceptor {} call the method setNextInterceptor " +
|
||||
"does not have access.", className);
|
||||
throw new YarnRuntimeException("RequestInterceptor "
|
||||
+ className + " call the method setNextInterceptor does not have access.", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system.");
|
||||
}
|
||||
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
private static List<String> getInterceptorClassNames(Configuration conf,
|
||||
String pipeLineClass, String interceptorClass) {
|
||||
String configuredInterceptorClassNames = conf.get(pipeLineClass, interceptorClass);
|
||||
List<String> interceptorClassNames = new ArrayList<>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
/**
|
||||
* Throws an IOException due to an error.
|
||||
*
|
||||
|
@ -20,10 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -33,8 +30,6 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
@ -108,8 +103,8 @@
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.slf4j.Logger;
|
||||
@ -147,7 +142,7 @@ public RouterClientRMService() {
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
LOG.info("Starting Router ClientRMService");
|
||||
LOG.info("Starting Router ClientRMService.");
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
@ -161,9 +156,7 @@ protected void serviceStart() throws Exception {
|
||||
int maxCacheSize =
|
||||
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||
this.userPipelineMap = Collections.synchronizedMap(
|
||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||
maxCacheSize, true));
|
||||
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
|
||||
|
||||
Configuration serverConf = new Configuration(conf);
|
||||
|
||||
@ -181,14 +174,13 @@ protected void serviceStart() throws Exception {
|
||||
}
|
||||
|
||||
this.server.start();
|
||||
LOG.info("Router ClientRMService listening on address: "
|
||||
+ this.server.getListenerAddress());
|
||||
LOG.info("Router ClientRMService listening on address: {}.", this.server.getListenerAddress());
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping Router ClientRMService");
|
||||
LOG.info("Stopping Router ClientRMService.");
|
||||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
@ -201,27 +193,6 @@ public Server getServer() {
|
||||
return this.server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the comma separated interceptor class names from the configuration.
|
||||
*
|
||||
* @param conf
|
||||
* @return the interceptor class names as an instance of ArrayList
|
||||
*/
|
||||
private List<String> getInterceptorClassNames(Configuration conf) {
|
||||
String configuredInterceptorClassNames =
|
||||
conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException, IOException {
|
||||
@ -507,44 +478,10 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
|
||||
@VisibleForTesting
|
||||
protected ClientRequestInterceptor createRequestInterceptorChain() {
|
||||
Configuration conf = getConfig();
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||
|
||||
ClientRequestInterceptor pipeline = null;
|
||||
ClientRequestInterceptor current = null;
|
||||
for (String interceptorClassName : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
|
||||
if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
|
||||
ClientRequestInterceptor interceptorInstance =
|
||||
(ClientRequestInterceptor) ReflectionUtils
|
||||
.newInstance(interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = interceptorInstance;
|
||||
current = interceptorInstance;
|
||||
continue;
|
||||
} else {
|
||||
current.setNextInterceptor(interceptorInstance);
|
||||
current = interceptorInstance;
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException(
|
||||
"Class: " + interceptorClassName + " not instance of "
|
||||
+ ClientRequestInterceptor.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not instantiate ApplicationClientRequestInterceptor: "
|
||||
+ interceptorClassName,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system");
|
||||
}
|
||||
return pipeline;
|
||||
return RouterServerUtil.createRequestInterceptorChain(conf,
|
||||
YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS,
|
||||
ClientRequestInterceptor.class);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -565,15 +502,15 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
|
||||
try {
|
||||
// We should init the pipeline instance after it is created and then
|
||||
// add to the map, to ensure thread safe.
|
||||
LOG.info("Initializing request processing pipeline for application "
|
||||
+ "for the user: {}", user);
|
||||
LOG.info("Initializing request processing pipeline for application for the user: {}.",
|
||||
user);
|
||||
|
||||
ClientRequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(user);
|
||||
chainWrapper.init(interceptorChain);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Init ClientRequestInterceptor error for user: " + user, e);
|
||||
LOG.error("Init ClientRequestInterceptor error for user: {}.", user, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
@ -20,10 +20,7 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
@ -34,11 +31,8 @@
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
|
||||
@ -69,6 +63,7 @@
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
import org.slf4j.Logger;
|
||||
@ -106,7 +101,7 @@ public RouterRMAdminService() {
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
LOG.info("Starting Router RMAdmin Service");
|
||||
LOG.info("Starting Router RMAdmin Service.");
|
||||
Configuration conf = getConfig();
|
||||
YarnRPC rpc = YarnRPC.create(conf);
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
@ -120,9 +115,7 @@ protected void serviceStart() throws Exception {
|
||||
int maxCacheSize =
|
||||
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||
this.userPipelineMap = Collections.synchronizedMap(
|
||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||
maxCacheSize, true));
|
||||
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
|
||||
|
||||
Configuration serverConf = new Configuration(conf);
|
||||
|
||||
@ -139,14 +132,13 @@ protected void serviceStart() throws Exception {
|
||||
}
|
||||
|
||||
this.server.start();
|
||||
LOG.info("Router RMAdminService listening on address: "
|
||||
+ this.server.getListenerAddress());
|
||||
LOG.info("Router RMAdminService listening on address: {}.", this.server.getListenerAddress());
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
LOG.info("Stopping Router RMAdminService");
|
||||
LOG.info("Stopping Router RMAdminService.");
|
||||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
@ -164,27 +156,6 @@ public Server getServer() {
|
||||
return this.server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the comma separated interceptor class names from the configuration.
|
||||
*
|
||||
* @param conf
|
||||
* @return the interceptor class names as an instance of ArrayList
|
||||
*/
|
||||
private List<String> getInterceptorClassNames(Configuration conf) {
|
||||
String configuredInterceptorClassNames =
|
||||
conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected RequestInterceptorChainWrapper getInterceptorChain()
|
||||
throws IOException {
|
||||
@ -215,45 +186,10 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
|
||||
@VisibleForTesting
|
||||
protected RMAdminRequestInterceptor createRequestInterceptorChain() {
|
||||
Configuration conf = getConfig();
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||
|
||||
RMAdminRequestInterceptor pipeline = null;
|
||||
RMAdminRequestInterceptor current = null;
|
||||
for (String interceptorClassName : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
|
||||
if (RMAdminRequestInterceptor.class
|
||||
.isAssignableFrom(interceptorClass)) {
|
||||
RMAdminRequestInterceptor interceptorInstance =
|
||||
(RMAdminRequestInterceptor) ReflectionUtils
|
||||
.newInstance(interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = interceptorInstance;
|
||||
current = interceptorInstance;
|
||||
continue;
|
||||
} else {
|
||||
current.setNextInterceptor(interceptorInstance);
|
||||
current = interceptorInstance;
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException(
|
||||
"Class: " + interceptorClassName + " not instance of "
|
||||
+ RMAdminRequestInterceptor.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not instantiate RMAdminRequestInterceptor: "
|
||||
+ interceptorClassName,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system");
|
||||
}
|
||||
return pipeline;
|
||||
return RouterServerUtil.createRequestInterceptorChain(conf,
|
||||
YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS,
|
||||
RMAdminRequestInterceptor.class);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -274,14 +210,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
|
||||
try {
|
||||
// We should init the pipeline instance after it is created and then
|
||||
// add to the map, to ensure thread safe.
|
||||
LOG.info("Initializing request processing pipeline for user: {}", user);
|
||||
LOG.info("Initializing request processing pipeline for user: {}.", user);
|
||||
|
||||
RMAdminRequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(user);
|
||||
chainWrapper.init(interceptorChain);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Init RMAdminRequestInterceptor error for user: " + user, e);
|
||||
LOG.error("Init RMAdminRequestInterceptor error for user: {}.", user, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
|
@ -19,10 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.router.webapp;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@ -48,11 +45,8 @@
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServiceProtocol;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
|
||||
@ -86,6 +80,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
|
||||
import org.apache.hadoop.yarn.server.router.Router;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
|
||||
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
|
||||
import org.apache.hadoop.yarn.util.LRUCacheHashMap;
|
||||
@ -136,30 +131,7 @@ public RouterWebServices(final Router router, Configuration conf) {
|
||||
int maxCacheSize =
|
||||
conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
|
||||
this.userPipelineMap = Collections.synchronizedMap(
|
||||
new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
|
||||
maxCacheSize, true));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the comma separated interceptor class names from the configuration.
|
||||
*
|
||||
* @param conf
|
||||
* @return the interceptor class names as an instance of ArrayList
|
||||
*/
|
||||
private List<String> getInterceptorClassNames(Configuration config) {
|
||||
String configuredInterceptorClassNames =
|
||||
config.get(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS);
|
||||
|
||||
List<String> interceptorClassNames = new ArrayList<String>();
|
||||
Collection<String> tempList =
|
||||
StringUtils.getStringCollection(configuredInterceptorClassNames);
|
||||
for (String item : tempList) {
|
||||
interceptorClassNames.add(item.trim());
|
||||
}
|
||||
|
||||
return interceptorClassNames;
|
||||
this.userPipelineMap = Collections.synchronizedMap(new LRUCacheHashMap<>(maxCacheSize, true));
|
||||
}
|
||||
|
||||
private void init() {
|
||||
@ -207,50 +179,16 @@ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected RESTRequestInterceptor createRequestInterceptorChain() {
|
||||
|
||||
List<String> interceptorClassNames = getInterceptorClassNames(conf);
|
||||
|
||||
RESTRequestInterceptor pipeline = null;
|
||||
RESTRequestInterceptor current = null;
|
||||
for (String interceptorClassName : interceptorClassNames) {
|
||||
try {
|
||||
Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
|
||||
if (RESTRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
|
||||
RESTRequestInterceptor interceptorInstance =
|
||||
(RESTRequestInterceptor) ReflectionUtils
|
||||
.newInstance(interceptorClass, conf);
|
||||
if (pipeline == null) {
|
||||
pipeline = interceptorInstance;
|
||||
current = interceptorInstance;
|
||||
continue;
|
||||
} else {
|
||||
current.setNextInterceptor(interceptorInstance);
|
||||
current = interceptorInstance;
|
||||
}
|
||||
} else {
|
||||
throw new YarnRuntimeException(
|
||||
"Class: " + interceptorClassName + " not instance of "
|
||||
+ RESTRequestInterceptor.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException(
|
||||
"Could not instantiate RESTRequestInterceptor: "
|
||||
+ interceptorClassName,
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
if (pipeline == null) {
|
||||
throw new YarnRuntimeException(
|
||||
"RequestInterceptor pipeline is not configured in the system");
|
||||
}
|
||||
return pipeline;
|
||||
return RouterServerUtil.createRequestInterceptorChain(conf,
|
||||
YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
|
||||
YarnConfiguration.DEFAULT_ROUTER_WEBAPP_INTERCEPTOR_CLASS,
|
||||
RESTRequestInterceptor.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes the request interceptor pipeline for the specified user.
|
||||
*
|
||||
* @param user
|
||||
* @param user specified user.
|
||||
*/
|
||||
private RequestInterceptorChainWrapper initializePipeline(String user) {
|
||||
synchronized (this.userPipelineMap) {
|
||||
@ -265,14 +203,14 @@ private RequestInterceptorChainWrapper initializePipeline(String user) {
|
||||
try {
|
||||
// We should init the pipeline instance after it is created and then
|
||||
// add to the map, to ensure thread safe.
|
||||
LOG.info("Initializing request processing pipeline for user: {}", user);
|
||||
LOG.info("Initializing request processing pipeline for user: {}.", user);
|
||||
|
||||
RESTRequestInterceptor interceptorChain =
|
||||
this.createRequestInterceptorChain();
|
||||
interceptorChain.init(user);
|
||||
chainWrapper.init(interceptorChain);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Init RESTRequestInterceptor error for user: " + user, e);
|
||||
LOG.error("Init RESTRequestInterceptor error for user: {}", user, e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
@ -338,7 +276,7 @@ public ClusterInfo getClusterInfo() {
|
||||
@GET
|
||||
@Path(RMWSConsts.CLUSTER_USER_INFO)
|
||||
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
@Override
|
||||
public ClusterUserInfo getClusterUserInfo(@Context HttpServletRequest hsr) {
|
||||
init();
|
||||
@ -834,10 +772,12 @@ public Response deleteReservation(ReservationDeleteRequestInfo resContext,
|
||||
@Override
|
||||
public Response listReservation(
|
||||
@QueryParam(RMWSConsts.QUEUE) @DefaultValue(DEFAULT_QUEUE) String queue,
|
||||
@QueryParam(RMWSConsts.RESERVATION_ID) @DefaultValue(DEFAULT_RESERVATION_ID) String reservationId,
|
||||
@QueryParam(RMWSConsts.RESERVATION_ID)
|
||||
@DefaultValue(DEFAULT_RESERVATION_ID) String reservationId,
|
||||
@QueryParam(RMWSConsts.START_TIME) @DefaultValue(DEFAULT_START_TIME) long startTime,
|
||||
@QueryParam(RMWSConsts.END_TIME) @DefaultValue(DEFAULT_END_TIME) long endTime,
|
||||
@QueryParam(RMWSConsts.INCLUDE_RESOURCE) @DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
|
||||
@QueryParam(RMWSConsts.INCLUDE_RESOURCE)
|
||||
@DefaultValue(DEFAULT_INCLUDE_RESOURCE) boolean includeResourceAllocations,
|
||||
@Context HttpServletRequest hsr) throws Exception {
|
||||
init();
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
|
||||
|
@ -23,7 +23,6 @@
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
|
||||
@ -185,7 +184,7 @@ public void testRouterRMAdminServiceE2E() throws Exception {
|
||||
*/
|
||||
@Test
|
||||
public void testUsersChainMapWithLRUCache()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
Map<String, RequestInterceptorChainWrapper> pipelines;
|
||||
RequestInterceptorChainWrapper chain;
|
||||
|
Loading…
Reference in New Issue
Block a user