From f44cf99599119b5e989be724eeab447b2dc4fe53 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 24 Oct 2014 23:05:16 -0700 Subject: [PATCH] YARN-2314. Disable ContainerManagementProtocolProxy cache by default to prevent creating thousands of threads in a large cluster. Contributed by Jason Lowe --- hadoop-yarn-project/CHANGES.txt | 4 + .../hadoop/yarn/conf/YarnConfiguration.java | 40 +++++++--- .../ContainerManagementProtocolProxy.java | 80 +++++++++++++++---- .../src/main/resources/yarn-default.xml | 34 ++++---- 4 files changed, 115 insertions(+), 43 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 26ca5d040c..0b620ec198 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -746,6 +746,10 @@ Release 2.6.0 - UNRELEASED to contact with AM before AM actually receives the ClientToAMTokenMasterKey. (Jason Lowe via jianhe) + YARN-2314. Disable ContainerManagementProtocolProxy cache by default to + prevent creating thousands of threads in a large cluster. (Jason Lowe via + jianhe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 59e303ffa7..2d08fdeb22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -70,10 +70,18 @@ public class YarnConfiguration extends Configuration { public static final int APPLICATION_MAX_TAG_LENGTH = 100; static { + addDeprecatedKeys(); Configuration.addDefaultResource(YARN_DEFAULT_CONFIGURATION_FILE); Configuration.addDefaultResource(YARN_SITE_CONFIGURATION_FILE); } + private static void addDeprecatedKeys() { + Configuration.addDeprecations(new DeprecationDelta[] { + new DeprecationDelta("yarn.client.max-nodemanagers-proxies", + NM_CLIENT_MAX_NM_PROXIES) + }); + } + //Configurations public static final String YARN_PREFIX = "yarn."; @@ -1446,21 +1454,27 @@ public class YarnConfiguration extends Configuration { public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500; /** - * Maximum number of proxy connections for node manager. It should always be - * more than 1. NMClient and MRAppMaster will use this to cache connection - * with node manager. There will be at max one connection per node manager. - * Ex. configuring it to a value of 5 will make sure that client will at - * max have 5 connections cached with 5 different node managers. These - * connections will be timed out if idle for more than system wide idle - * timeout period. The token if used for authentication then it will be used - * only at connection creation time. If new token is received then earlier - * connection should be closed in order to use newer token. - * Note: {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE} - * are related to each other. + * Maximum number of proxy connections to cache for node managers. If set + * to a value greater than zero then the cache is enabled and the NMClient + * and MRAppMaster will cache the specified number of node manager proxies. + * There will be at max one proxy per node manager. Ex. configuring it to a + * value of 5 will make sure that client will at max have 5 proxies cached + * with 5 different node managers. These connections for these proxies will + * be timed out if idle for more than the system wide idle timeout period. + * Note that this could cause issues on large clusters as many connections + * could linger simultaneously and lead to a large number of connection + * threads. The token used for authentication will be used only at + * connection creation time. If a new token is received then the earlier + * connection should be closed in order to use the new token. This and + * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE} are related + * and should be in sync (no need for them to be equal). + * If the value of this property is zero then the connection cache is + * disabled and connections will use a zero idle timeout to prevent too + * many connection threads on large clusters. */ public static final String NM_CLIENT_MAX_NM_PROXIES = - YARN_PREFIX + "client.max-nodemanagers-proxies"; - public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500; + YARN_PREFIX + "client.max-cached-nodemanagers-proxies"; + public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 0; /** Max time to wait to establish a connection to NM */ public static final String CLIENT_NM_CONNECT_MAX_WAIT_MS = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java index daeae92709..eaf048d79d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java @@ -20,14 +20,17 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -53,7 +56,7 @@ public class ContainerManagementProtocolProxy { static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class); private final int maxConnectedNMs; - private final LinkedHashMap cmProxy; + private final Map cmProxy; private final Configuration conf; private final YarnRPC rpc; private NMTokenCache nmTokenCache; @@ -70,16 +73,25 @@ public ContainerManagementProtocolProxy(Configuration conf, maxConnectedNMs = conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES, YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES); - if (maxConnectedNMs < 1) { + if (maxConnectedNMs < 0) { throw new YarnRuntimeException( YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES - + " (" + maxConnectedNMs + ") can not be less than 1."); + + " (" + maxConnectedNMs + ") can not be less than 0."); } LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : " + maxConnectedNMs); - cmProxy = - new LinkedHashMap(); + if (maxConnectedNMs > 0) { + cmProxy = + new LinkedHashMap(); + } else { + cmProxy = Collections.emptyMap(); + // Connections are not being cached so ensure connections close quickly + // to avoid creating thousands of RPC client threads on large clusters. + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, + 0); + } rpc = YarnRPC.create(conf); } @@ -117,13 +129,9 @@ public synchronized ContainerManagementProtocolProxyData getProxy( proxy = new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr, containerId, nmTokenCache.getToken(containerManagerBindAddr)); - if (cmProxy.size() > maxConnectedNMs) { - // Number of existing proxy exceed the limit. - String cmAddr = cmProxy.keySet().iterator().next(); - removeProxy(cmProxy.get(cmAddr)); + if (maxConnectedNMs > 0) { + addProxyToCache(containerManagerBindAddr, proxy); } - - cmProxy.put(containerManagerBindAddr, proxy); } // This is to track active users of this proxy. proxy.activeCallers++; @@ -131,15 +139,52 @@ public synchronized ContainerManagementProtocolProxyData getProxy( return proxy; } + + private void addProxyToCache(String containerManagerBindAddr, + ContainerManagementProtocolProxyData proxy) { + while (cmProxy.size() >= maxConnectedNMs) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaning up the proxy cache, size=" + cmProxy.size() + + " max=" + maxConnectedNMs); + } + boolean removedProxy = false; + for (ContainerManagementProtocolProxyData otherProxy : cmProxy.values()) { + removedProxy = removeProxy(otherProxy); + if (removedProxy) { + break; + } + } + if (!removedProxy) { + // all of the proxies are currently in use and already scheduled + // for removal, so we need to wait until at least one of them closes + try { + this.wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + if (maxConnectedNMs > 0) { + cmProxy.put(containerManagerBindAddr, proxy); + } + } private void updateLRUCache(String containerManagerBindAddr) { - ContainerManagementProtocolProxyData proxy = - cmProxy.remove(containerManagerBindAddr); - cmProxy.put(containerManagerBindAddr, proxy); + if (maxConnectedNMs > 0) { + ContainerManagementProtocolProxyData proxy = + cmProxy.remove(containerManagerBindAddr); + cmProxy.put(containerManagerBindAddr, proxy); + } } public synchronized void mayBeCloseProxy( ContainerManagementProtocolProxyData proxy) { + tryCloseProxy(proxy); + } + + private boolean tryCloseProxy( + ContainerManagementProtocolProxyData proxy) { proxy.activeCallers--; if (proxy.scheduledForClose && proxy.activeCallers < 0) { LOG.info("Closing proxy : " + proxy.containerManagerBindAddr); @@ -149,15 +194,18 @@ public synchronized void mayBeCloseProxy( } finally { this.notifyAll(); } + return true; } + return false; } - private synchronized void removeProxy( + private synchronized boolean removeProxy( ContainerManagementProtocolProxyData proxy) { if (!proxy.scheduledForClose) { proxy.scheduledForClose = true; - mayBeCloseProxy(proxy); + return tryCloseProxy(proxy); } + return false; } public synchronized void stopAllProxies() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 3c3d7e3ad2..a64ed73eef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1095,21 +1095,27 @@ - - Maximum number of proxy connections for node manager. It should always be - more than 1. NMClient and MRAppMaster will use this to cache connection - with node manager. There will be at max one connection per node manager. - Ex. configuring it to a value of 5 will make sure that client will at - max have 5 connections cached with 5 different node managers. These - connections will be timed out if idle for more than system wide idle - timeout period. The token if used for authentication then it will be used - only at connection creation time. If new token is received then earlier - connection should be closed in order to use newer token. This and + + Maximum number of proxy connections to cache for node managers. If set + to a value greater than zero then the cache is enabled and the NMClient + and MRAppMaster will cache the specified number of node manager proxies. + There will be at max one proxy per node manager. Ex. configuring it to a + value of 5 will make sure that client will at max have 5 proxies cached + with 5 different node managers. These connections for these proxies will + be timed out if idle for more than the system wide idle timeout period. + Note that this could cause issues on large clusters as many connections + could linger simultaneously and lead to a large number of connection + threads. The token used for authentication will be used only at + connection creation time. If a new token is received then the earlier + connection should be closed in order to use the new token. This and (yarn.client.nodemanager-client-async.thread-pool-max-size) are related - and should be sync (no need for them to be equal). - - yarn.client.max-nodemanagers-proxies - 500 + and should be in sync (no need for them to be equal). + If the value of this property is zero then the connection cache is + disabled and connections will use a zero idle timeout to prevent too + many connection threads on large clusters. + + yarn.client.max-cached-nodemanagers-proxies + 0