diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 90a8978a22..2a20451922 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4369,6 +4369,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME = TimeUnit.SECONDS.toMillis(0); // 0s + /** + * This method configures the policy for core threads regarding termination + * when no tasks arrive within the keep-alive time. + * When set to false, core threads are never terminated due to a lack of tasks. + * When set to true, the same keep-alive policy + * that applies to non-core threads also applies to core threads. + * To prevent constant thread replacement, + * ensure that the keep-alive time is greater than zero when setting it to true. + * It's advisable to call this method before the pool becomes actively used. + */ + public static final String ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = + ROUTER_PREFIX + "interceptor.user-thread-pool.allow-core-thread-time-out"; + + public static final boolean DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = + false; + /** The address of the Router web application. */ public static final String ROUTER_WEBAPP_ADDRESS = ROUTER_WEBAPP_PREFIX + "address"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 9991e841d7..72e8cc70f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -5139,6 +5139,23 @@ + + yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out + false + + This method configures the policy for core threads regarding termination + when no tasks arrive within the keep-alive time. + When set to false, core threads are never terminated due to a lack of tasks. + When set to true, the same keep-alive policy + that applies to non-core threads also applies to core threads. + To prevent constant thread replacement, + ensure that the keep-alive time is greater than zero when setting it to true. + It's advisable to call this method before the pool becomes actively used. + We need to ensure that + yarn.router.interceptor.user-thread-pool.keep-alive-time is greater than 0. + + + yarn.router.submit.interval.time 10ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 9c3f9971d8..35b3e6eeb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -231,7 +231,13 @@ public void init(String userName) { keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory); // Adding this line so that unused user threads will exit and be cleaned up if idle for too long - this.executorService.allowCoreThreadTimeOut(true); + boolean allowCoreThreadTimeOut = getConf().getBoolean( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); + + if (keepAliveTime > 0 && allowCoreThreadTimeOut) { + this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + } final Configuration conf = this.getConf(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index b7c1462a60..d269cfe097 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -130,9 +130,21 @@ public void init(String userName) { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build(); + long keepAliveTime = getConf().getTimeDuration( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS); + BlockingQueue workQueue = new LinkedBlockingQueue<>(); this.executorService = new ThreadPoolExecutor(numThreads, numThreads, - 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); + keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory); + + boolean allowCoreThreadTimeOut = getConf().getBoolean( + YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT, + YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT); + + if (keepAliveTime > 0 && allowCoreThreadTimeOut) { + this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut); + } federationFacade = FederationStateStoreFacade.getInstance(this.getConf()); this.conf = this.getConf();