diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index e53aeecbae..25513fec90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4136,11 +4136,87 @@ public static boolean isAclEnabled(Configuration conf) { public static final String ROUTER_WEBAPP_PREFIX = ROUTER_PREFIX + "webapp."; + /** + * This configurable that controls the thread pool size of the threadpool of the interceptor. + * The corePoolSize(minimumPoolSize) and maximumPoolSize of the thread pool + * are controlled by this configurable. + * In order to control the thread pool more accurately, this parameter is deprecated. + * + * corePoolSize(minimumPoolSize) use + * {@link YarnConfiguration#ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE} + * + * maximumPoolSize use + * {@link YarnConfiguration#ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE} + * + * This configurable will be deprecated. + */ public static final String ROUTER_USER_CLIENT_THREADS_SIZE = ROUTER_PREFIX + "interceptor.user.threadpool-size"; + /** + * The default value is 5. + * which means that the corePoolSize(minimumPoolSize) and maximumPoolSize + * of the thread pool are both 5s. + * + * corePoolSize(minimumPoolSize) default value use + * {@link YarnConfiguration#DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE} + * + * maximumPoolSize default value use + * {@link YarnConfiguration#DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE} + */ public static final int DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE = 5; + /** + * This configurable is used to set the corePoolSize(minimumPoolSize) + * of the thread pool of the interceptor. + * + * corePoolSize the number of threads to keep in the pool, even if they are idle. + */ + public static final String ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE = + ROUTER_PREFIX + "interceptor.user-thread-pool.minimum-pool-size"; + + /** + * This configuration is used to set the default value of corePoolSize (minimumPoolSize) + * of the thread pool of the interceptor. + * + * Default is 5. + */ + public static final int DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE = 5; + + /** + * This configurable is used to set the maximumPoolSize of the thread pool of the interceptor. + * + * maximumPoolSize the maximum number of threads to allow in the pool. + */ + public static final String ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE = + ROUTER_PREFIX + "interceptor.user-thread-pool.maximum-pool-size"; + + /** + * This configuration is used to set the default value of maximumPoolSize + * of the thread pool of the interceptor. + * + * Default is 5. + */ + public static final int DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE = 5; + + /** + * This configurable is used to set the keepAliveTime of the thread pool of the interceptor. + * + * keepAliveTime when the number of threads is greater than the core, + * this is the maximum time that excess idle threads will wait for new tasks before terminating. + */ + public static final String ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME = + ROUTER_PREFIX + "interceptor.user-thread-pool.keep-alive-time"; + + /** + * This configurable is used to set the default time of keepAliveTime + * of the thread pool of the interceptor. + * + * the default value is 0s. + */ + public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME = + TimeUnit.SECONDS.toMillis(0); // 0s + /** The address of the Router web application. */ public static final String ROUTER_WEBAPP_ADDRESS = ROUTER_WEBAPP_PREFIX + "address"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index e46473db0f..e1cc6adbe5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5047,6 +5047,35 @@ + + yarn.router.interceptor.user-thread-pool.minimum-pool-size + 5 + + This configurable is used to set the corePoolSize(minimumPoolSize) + of the thread pool of the interceptor. + Default is 5. + + + + + yarn.router.interceptor.user-thread-pool.maximum-pool-size + 5 + + This configuration is used to set the default value of maximumPoolSize + of the thread pool of the interceptor. + Default is 5. + + + + + yarn.router.interceptor.user-thread-pool.keep-alive-time + 0s + + This configurable is used to set the keepAliveTime of the thread pool of the interceptor. + Default is 0s. + + + yarn.router.submit.interval.time 10ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index faffbf602b..9bfcb04ff8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -187,15 +187,20 @@ public void init(String userName) { federationFacade = FederationStateStoreFacade.getInstance(); rand = new Random(System.currentTimeMillis()); - int numThreads = getConf().getInt( - YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, - YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE); + int numMinThreads = getNumMinThreads(getConf()); + + int numMaxThreads = getNumMaxThreads(getConf()); + + long keepAliveTime = getConf().getTimeDuration( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS); + ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router Client-" + userName + "-%d ").build(); - BlockingQueue workQueue = new LinkedBlockingQueue<>(); - this.executorService = new ThreadPoolExecutor(numThreads, numThreads, - 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); + BlockingQueue workQueue = new LinkedBlockingQueue<>(); + this.executorService = new ThreadPoolExecutor(numMinThreads, numMaxThreads, + keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory); final Configuration conf = this.getConf(); @@ -1949,6 +1954,46 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId, } } + protected int getNumMinThreads(Configuration conf) { + + String threadSize = conf.get(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE); + + // If the user configures YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, + // we will still get the number of threads from this configuration. + if (StringUtils.isNotBlank(threadSize)) { + LOG.warn("{} is a deprecated property, " + + "please remove it, use {} to configure the minimum number of thread pool.", + YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE); + return Integer.parseInt(threadSize); + } + + int numMinThreads = conf.getInt( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE); + return numMinThreads; + } + + protected int getNumMaxThreads(Configuration conf) { + + String threadSize = conf.get(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE); + + // If the user configures YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, + // we will still get the number of threads from this configuration. + if (StringUtils.isNotBlank(threadSize)) { + LOG.warn("{} is a deprecated property, " + + "please remove it, use {} to configure the maximum number of thread pool.", + YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE, + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE); + return Integer.parseInt(threadSize); + } + + int numMaxThreads = conf.getInt( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE); + return numMaxThreads; + } + @VisibleForTesting public void setNumSubmitRetries(int numSubmitRetries) { this.numSubmitRetries = numSubmitRetries; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java index 1fc1e92033..38f571c288 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -1520,4 +1520,34 @@ private ReservationDefinition createReservationDefinition(long arrival, return ReservationDefinition.newInstance(arrival, deadline, requests, username, "0", Priority.UNDEFINED); } + + @Test + public void testGetNumMinThreads() { + // If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, + // we expect to get 5 threads + int minThreads = interceptor.getNumMinThreads(this.getConf()); + Assert.assertEquals(5, minThreads); + + // If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, + // we expect to get 3 threads + this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE); + this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MINIMUM_POOL_SIZE, 3); + int minThreads2 = interceptor.getNumMinThreads(this.getConf()); + Assert.assertEquals(3, minThreads2); + } + + @Test + public void testGetNumMaxThreads() { + // If we don't configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, + // we expect to get 5 threads + int minThreads = interceptor.getNumMaxThreads(this.getConf()); + Assert.assertEquals(5, minThreads); + + // If we configure YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, + // we expect to get 8 threads + this.getConf().unset(YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE); + this.getConf().setInt(YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_MAXIMUM_POOL_SIZE, 8); + int minThreads2 = interceptor.getNumMaxThreads(this.getConf()); + Assert.assertEquals(8, minThreads2); + } }