From feb90ffcca536e7deac50976b8a8774450fe089f Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 19 May 2016 13:01:36 -0700 Subject: [PATCH] YARN-4002. Make ResourceTrackerService#nodeHeartbeat more concurrent. Contributed by Rohith Sharma K S & Zhiguo Hong --- .../apache/hadoop/util/HostsFileReader.java | 161 ++++++++++++------ .../hadoop/util/TestHostsFileReader.java | 26 +++ .../resourcemanager/NodesListManager.java | 67 ++++---- 3 files changed, 166 insertions(+), 88 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java index cac43c90e5..c5d6b869c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java @@ -21,6 +21,9 @@ import java.io.*; import java.util.Set; import java.util.HashSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.io.Charsets; import org.apache.commons.logging.LogFactory; @@ -38,6 +41,8 @@ public class HostsFileReader { private Set excludes; private String includesFile; private String excludesFile; + private WriteLock writeLock; + private ReadLock readLock; private static final Log LOG = LogFactory.getLog(HostsFileReader.class); @@ -47,6 +52,9 @@ public HostsFileReader(String inFile, excludes = new HashSet(); includesFile = inFile; excludesFile = exFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(); } @@ -57,6 +65,9 @@ public HostsFileReader(String includesFile, InputStream inFileInputStream, excludes = new HashSet(); this.includesFile = includesFile; this.excludesFile = excludesFile; + ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + this.writeLock = rwLock.writeLock(); + this.readLock = rwLock.readLock(); refresh(inFileInputStream, exFileInputStream); } @@ -101,80 +112,126 @@ public static void readFileToSetWithFileInputStream(String type, } } - public synchronized void refresh() throws IOException { - LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (!includesFile.isEmpty()) { - readFileToSet("included", includesFile, newIncludes); - switchIncludes = true; - } - if (!excludesFile.isEmpty()) { - readFileToSet("excluded", excludesFile, newExcludes); - switchExcludes = true; + public void refresh() throws IOException { + this.writeLock.lock(); + try { + refresh(includesFile, excludesFile); + } finally { + this.writeLock.unlock(); } + } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; - } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + public void refresh(String includeFiles, String excludeFiles) + throws IOException { + LOG.info("Refreshing hosts (include/exclude) list"); + this.writeLock.lock(); + try { + // update instance variables + updateFileNames(includeFiles, excludeFiles); + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (includeFiles != null && !includeFiles.isEmpty()) { + readFileToSet("included", includeFiles, newIncludes); + switchIncludes = true; + } + if (excludeFiles != null && !excludeFiles.isEmpty()) { + readFileToSet("excluded", excludeFiles, newExcludes); + switchExcludes = true; + } + + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } } @Private - public synchronized void refresh(InputStream inFileInputStream, + public void refresh(InputStream inFileInputStream, InputStream exFileInputStream) throws IOException { LOG.info("Refreshing hosts (include/exclude) list"); - Set newIncludes = new HashSet(); - Set newExcludes = new HashSet(); - boolean switchIncludes = false; - boolean switchExcludes = false; - if (inFileInputStream != null) { - readFileToSetWithFileInputStream("included", includesFile, - inFileInputStream, newIncludes); - switchIncludes = true; - } - if (exFileInputStream != null) { - readFileToSetWithFileInputStream("excluded", excludesFile, - exFileInputStream, newExcludes); - switchExcludes = true; - } - if (switchIncludes) { - // switch the new hosts that are to be included - includes = newIncludes; - } - if (switchExcludes) { - // switch the excluded hosts - excludes = newExcludes; + this.writeLock.lock(); + try { + Set newIncludes = new HashSet(); + Set newExcludes = new HashSet(); + boolean switchIncludes = false; + boolean switchExcludes = false; + if (inFileInputStream != null) { + readFileToSetWithFileInputStream("included", includesFile, + inFileInputStream, newIncludes); + switchIncludes = true; + } + if (exFileInputStream != null) { + readFileToSetWithFileInputStream("excluded", excludesFile, + exFileInputStream, newExcludes); + switchExcludes = true; + } + if (switchIncludes) { + // switch the new hosts that are to be included + includes = newIncludes; + } + if (switchExcludes) { + // switch the excluded hosts + excludes = newExcludes; + } + } finally { + this.writeLock.unlock(); } } - public synchronized Set getHosts() { - return includes; + public Set getHosts() { + this.readLock.lock(); + try { + return includes; + } finally { + this.readLock.unlock(); + } } - public synchronized Set getExcludedHosts() { - return excludes; + public Set getExcludedHosts() { + this.readLock.lock(); + try { + return excludes; + } finally { + this.readLock.unlock(); + } } - public synchronized void setIncludesFile(String includesFile) { + public void getHostDetails(Set includes, Set excludes) { + this.readLock.lock(); + try { + includes.addAll(this.includes); + excludes.addAll(this.excludes); + } finally { + this.readLock.unlock(); + } + } + + public void setIncludesFile(String includesFile) { LOG.info("Setting the includes file to " + includesFile); this.includesFile = includesFile; } - public synchronized void setExcludesFile(String excludesFile) { + public void setExcludesFile(String excludesFile) { LOG.info("Setting the excludes file to " + excludesFile); this.excludesFile = excludesFile; } - public synchronized void updateFileNames(String includesFile, - String excludesFile) { - setIncludesFile(includesFile); - setExcludesFile(excludesFile); + public void updateFileNames(String includeFiles, String excludeFiles) { + this.writeLock.lock(); + try { + setIncludesFile(includeFiles); + setExcludesFile(excludeFiles); + } finally { + this.writeLock.unlock(); + } } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java index 3000069ed2..8015f7a1ef 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java @@ -20,6 +20,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileWriter; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.test.GenericTestUtils; import org.junit.*; @@ -96,6 +98,30 @@ public void testHostsFileReader() throws Exception { assertTrue(hfp.getExcludedHosts().contains("somehost5")); assertFalse(hfp.getExcludedHosts().contains("host4")); + // test for refreshing hostreader wit new include/exclude host files + String newExcludesFile = HOSTS_TEST_DIR + "/dfs1.exclude"; + String newIncludesFile = HOSTS_TEST_DIR + "/dfs1.include"; + + efw = new FileWriter(newExcludesFile); + ifw = new FileWriter(newIncludesFile); + + efw.write("#DFS-Hosts-excluded\n"); + efw.write("node1\n"); + efw.close(); + + ifw.write("#Hosts-in-DFS\n"); + ifw.write("node2\n"); + ifw.close(); + + hfp.refresh(newIncludesFile, newExcludesFile); + assertTrue(hfp.getExcludedHosts().contains("node1")); + assertTrue(hfp.getHosts().contains("node2")); + + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hfp.getHostDetails(hostsList, excludeList); + assertTrue(excludeList.contains("node1")); + assertTrue(hostsList.contains("node2")); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index bb00e60a47..79373832e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -183,10 +183,15 @@ private void printConfiguredHosts() { YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" + conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH)); - for (String include : hostsReader.getHosts()) { + + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + for (String include : hostsList) { LOG.debug("include: " + include); } - for (String exclude : hostsReader.getExcludedHosts()) { + for (String exclude : excludeList) { LOG.debug("exclude: " + exclude); } } @@ -208,25 +213,17 @@ public void refreshNodes(Configuration yarnConf) throws IOException, private void refreshHostsReader(Configuration yarnConf) throws IOException, YarnException { - synchronized (hostsReader) { - if (null == yarnConf) { - yarnConf = new YarnConfiguration(); - } - includesFile = - yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); - excludesFile = - yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, - YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); - hostsReader.updateFileNames(includesFile, excludesFile); - hostsReader.refresh( - includesFile.isEmpty() ? null : this.rmContext - .getConfigurationProvider().getConfigurationInputStream( - this.conf, includesFile), excludesFile.isEmpty() ? null - : this.rmContext.getConfigurationProvider() - .getConfigurationInputStream(this.conf, excludesFile)); - printConfiguredHosts(); + if (null == yarnConf) { + yarnConf = new YarnConfiguration(); } + includesFile = + yarnConf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH); + excludesFile = + yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, + YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH); + hostsReader.refresh(includesFile, excludesFile); + printConfiguredHosts(); } private void setDecomissionedNMs() { @@ -364,13 +361,13 @@ public void run() { public boolean isValidNode(String hostName) { String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList - .contains(ip)) - && !(excludeList.contains(hostName) || excludeList.contains(ip)); - } + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList + .contains(ip)) + && !(excludeList.contains(hostName) || excludeList.contains(ip)); } @Override @@ -467,17 +464,15 @@ private void updateInactiveNodes() { } public boolean isUntrackedNode(String hostName) { - boolean untracked; String ip = resolver.resolve(hostName); - synchronized (hostsReader) { - Set hostsList = hostsReader.getHosts(); - Set excludeList = hostsReader.getExcludedHosts(); - untracked = !hostsList.isEmpty() && - !hostsList.contains(hostName) && !hostsList.contains(ip) && - !excludeList.contains(hostName) && !excludeList.contains(ip); - } - return untracked; + Set hostsList = new HashSet(); + Set excludeList = new HashSet(); + hostsReader.getHostDetails(hostsList, excludeList); + + return !hostsList.isEmpty() && !hostsList.contains(hostName) + && !hostsList.contains(ip) && !excludeList.contains(hostName) + && !excludeList.contains(ip); } /**