diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 662106bf08..98cc98f3ff 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -813,6 +813,9 @@ Release 2.8.0 - UNRELEASED
YARN-4073. Removed unused ApplicationACLsManager in ContainerManagerImpl constructor.
(Naganarasimha G R via rohithsharmaks)
+ YARN-4024. YARN RM should avoid unnecessary resolving IP when NMs doing heartbeat.
+ (Hong Zhiguo via wangda)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a18ef7c2a2..5e1bab281f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -746,6 +746,11 @@ public class YarnConfiguration extends Configuration {
+ "proxy-user-privileges.enabled";
public static final boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+ /** The expiry interval for node IP caching. -1 disables the caching */
+ public static final String RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = RM_PREFIX
+ + "node-ip-cache.expiry-interval-secs";
+ public static final int DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS = -1;
+
/**
* How many diagnostics/failure messages can be saved in RM for
* log aggregation. It also defines the number of diagnostics/failure
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 62ba599f82..436bfb04e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -272,6 +272,12 @@
+
+ The expiry interval for node IP caching. -1 disables the caching
+ yarn.resourcemanager.node-ip-cache.expiry-interval-secs
+ -1
+
+
Number of threads to handle resource tracker calls.
yarn.resourcemanager.resource-tracker.client.thread-count
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index b9c76fbe78..abea85e908 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -24,13 +24,18 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -46,9 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
@SuppressWarnings("unchecked")
-public class NodesListManager extends AbstractService implements
+public class NodesListManager extends CompositeService implements
EventHandler {
private static final Log LOG = LogFactory.getLog(NodesListManager.class);
@@ -63,6 +70,8 @@ public class NodesListManager extends AbstractService implements
private String includesFile;
private String excludesFile;
+ private Resolver resolver;
+
public NodesListManager(RMContext rmContext) {
super(NodesListManager.class.getName());
this.rmContext = rmContext;
@@ -73,6 +82,16 @@ public class NodesListManager extends AbstractService implements
this.conf = conf;
+ int nodeIpCacheTimeout = conf.getInt(
+ YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS,
+ YarnConfiguration.DEFAULT_RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS);
+ if (nodeIpCacheTimeout <= 0) {
+ resolver = new DirectResolver();
+ } else {
+ resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout);
+ addIfService(resolver);
+ }
+
// Read the hosts/exclude files to restrict access to the RM
try {
this.includesFile = conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
@@ -148,17 +167,129 @@ public class NodesListManager extends AbstractService implements
ClusterMetrics.getMetrics().setDecommisionedNMs(excludeList.size());
}
+ @VisibleForTesting
+ public Resolver getResolver() {
+ return resolver;
+ }
+
+ @VisibleForTesting
+ public interface Resolver {
+ // try to resolve hostName to IP address, fallback to hostName if failed
+ String resolve(String hostName);
+ }
+
+ @VisibleForTesting
+ public static class DirectResolver implements Resolver {
+ @Override
+ public String resolve(String hostName) {
+ return NetUtils.normalizeHostName(hostName);
+ }
+ }
+
+ @VisibleForTesting
+ public static class CachedResolver extends AbstractService
+ implements Resolver {
+ private static class CacheEntry {
+ public String ip;
+ public long resolveTime;
+ public CacheEntry(String ip, long resolveTime) {
+ this.ip = ip;
+ this.resolveTime = resolveTime;
+ }
+ }
+ private Map cache =
+ new ConcurrentHashMap();
+ private int expiryIntervalMs;
+ private int checkIntervalMs;
+ private final Clock clock;
+ private Timer checkingTimer;
+ private TimerTask expireChecker = new ExpireChecker();
+
+ public CachedResolver(Clock clock, int expiryIntervalSecs) {
+ super("NodesListManager.CachedResolver");
+ this.clock = clock;
+ this.expiryIntervalMs = expiryIntervalSecs * 1000;
+ checkIntervalMs = expiryIntervalMs/3;
+ checkingTimer = new Timer(
+ "Timer-NodesListManager.CachedResolver.ExpireChecker", true);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ checkingTimer.scheduleAtFixedRate(
+ expireChecker, checkIntervalMs, checkIntervalMs);
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ checkingTimer.cancel();
+ super.serviceStop();
+ }
+
+ @VisibleForTesting
+ public void addToCache(String hostName, String ip) {
+ cache.put(hostName, new CacheEntry(ip, clock.getTime()));
+ }
+
+ public void removeFromCache(String hostName) {
+ cache.remove(hostName);
+ }
+
+ private String reload(String hostName) {
+ String ip = NetUtils.normalizeHostName(hostName);
+ addToCache(hostName, ip);
+ return ip;
+ }
+
+ @Override
+ public String resolve(String hostName) {
+ CacheEntry e = cache.get(hostName);
+ if (e != null) {
+ return e.ip;
+ }
+ return reload(hostName);
+ }
+
+ @VisibleForTesting
+ public TimerTask getExpireChecker() {
+ return expireChecker;
+ }
+
+ private class ExpireChecker extends TimerTask {
+ @Override
+ public void run() {
+ long currentTime = clock.getTime();
+ Iterator> iterator =
+ cache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = iterator.next();
+ if (currentTime >
+ entry.getValue().resolveTime +
+ CachedResolver.this.expiryIntervalMs) {
+ iterator.remove();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[" + entry.getKey() + ":" + entry.getValue().ip +
+ "] Expired after " +
+ CachedResolver.this.expiryIntervalMs / 1000 + " secs");
+ }
+ }
+ }
+ }
+ }
+ }
+
public boolean isValidNode(String hostName) {
+ String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set hostsList = hostsReader.getHosts();
Set excludeList = hostsReader.getExcludedHosts();
- String ip = NetUtils.normalizeHostName(hostName);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
}
-
+
/**
* Provides the currently unusable nodes. Copies it into provided collection.
* @param unUsableNodes
@@ -207,6 +338,11 @@ public class NodesListManager extends AbstractService implements
default:
LOG.error("Ignoring invalid eventtype " + event.getType());
}
+ // remove the cache of normalized hostname if enabled
+ if (resolver instanceof CachedResolver) {
+ ((CachedResolver)resolver).removeFromCache(
+ eventNode.getNodeID().getHost());
+ }
}
private void disableHostsFileReader(Exception ex) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
index 5330976480..2f57dbf932 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.util.ControlledClock;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -130,6 +132,106 @@ public class TestNodesListManager {
}
+ @Test
+ public void testCachedResolver() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ControlledClock clock = new ControlledClock(new SystemClock());
+ clock.setTime(0);
+ final int CACHE_EXPIRY_INTERVAL_SECS = 30;
+ NodesListManager.CachedResolver resolver =
+ new NodesListManager.CachedResolver(clock, CACHE_EXPIRY_INTERVAL_SECS);
+ resolver.init(new YarnConfiguration());
+ resolver.start();
+ resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+
+ resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ // test removeFromCache
+ resolver.removeFromCache("testCachedResolverHost1");
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ // test expiry
+ clock.tickMsec(CACHE_EXPIRY_INTERVAL_SECS * 1000 + 1);
+ resolver.getExpireChecker().run();
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertNotEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+ }
+
+ @Test
+ public void testDefaultResolver() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+
+ YarnConfiguration conf = new YarnConfiguration();
+
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ NodesListManager nodesListManager = rm.getNodesListManager();
+
+ NodesListManager.Resolver resolver = nodesListManager.getResolver();
+ Assert.assertTrue("default resolver should be DirectResolver",
+ resolver instanceof NodesListManager.DirectResolver);
+ }
+
+ @Test
+ public void testCachedResolverWithEvent() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setInt(YarnConfiguration.RM_NODE_IP_CACHE_EXPIRY_INTERVAL_SECS, 30);
+
+ MockRM rm = new MockRM(conf);
+ rm.init(conf);
+ NodesListManager nodesListManager = rm.getNodesListManager();
+ nodesListManager.init(conf);
+ nodesListManager.start();
+
+ NodesListManager.CachedResolver resolver =
+ (NodesListManager.CachedResolver)nodesListManager.getResolver();
+
+ resolver.addToCache("testCachedResolverHost1", "1.1.1.1");
+ resolver.addToCache("testCachedResolverHost2", "1.1.1.2");
+ Assert.assertEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ RMNode rmnode1 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+ 1, "testCachedResolverHost1", 1234);
+ RMNode rmnode2 = MockNodes.newNodeInfo(1, Resource.newInstance(28000, 8),
+ 1, "testCachedResolverHost2", 1234);
+
+ nodesListManager.handle(
+ new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+ rmnode1));
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ nodesListManager.handle(
+ new NodesListManagerEvent(NodesListManagerEventType.NODE_USABLE,
+ rmnode2));
+ Assert.assertNotEquals("1.1.1.1",
+ resolver.resolve("testCachedResolverHost1"));
+ Assert.assertNotEquals("1.1.1.2",
+ resolver.resolve("testCachedResolverHost2"));
+
+ }
+
/*
* Create dispatcher object
*/