YARN-4002. Make ResourceTrackerService#nodeHeartbeat more concurrent. Contributed by Rohith Sharma K S & Zhiguo Hong

This commit is contained in:
Jian He 2016-05-19 13:01:36 -07:00
parent 141873ca7d
commit feb90ffcca
3 changed files with 166 additions and 88 deletions

View File

@ -21,6 +21,9 @@
import java.io.*;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.io.Charsets;
import org.apache.commons.logging.LogFactory;
@ -38,6 +41,8 @@ public class HostsFileReader {
private Set<String> excludes;
private String includesFile;
private String excludesFile;
private WriteLock writeLock;
private ReadLock readLock;
private static final Log LOG = LogFactory.getLog(HostsFileReader.class);
@ -47,6 +52,9 @@ public HostsFileReader(String inFile,
excludes = new HashSet<String>();
includesFile = inFile;
excludesFile = exFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.writeLock = rwLock.writeLock();
this.readLock = rwLock.readLock();
refresh();
}
@ -57,6 +65,9 @@ public HostsFileReader(String includesFile, InputStream inFileInputStream,
excludes = new HashSet<String>();
this.includesFile = includesFile;
this.excludesFile = excludesFile;
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.writeLock = rwLock.writeLock();
this.readLock = rwLock.readLock();
refresh(inFileInputStream, exFileInputStream);
}
@ -101,18 +112,32 @@ public static void readFileToSetWithFileInputStream(String type,
}
}
public synchronized void refresh() throws IOException {
public void refresh() throws IOException {
this.writeLock.lock();
try {
refresh(includesFile, excludesFile);
} finally {
this.writeLock.unlock();
}
}
public void refresh(String includeFiles, String excludeFiles)
throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
this.writeLock.lock();
try {
// update instance variables
updateFileNames(includeFiles, excludeFiles);
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
boolean switchIncludes = false;
boolean switchExcludes = false;
if (!includesFile.isEmpty()) {
readFileToSet("included", includesFile, newIncludes);
if (includeFiles != null && !includeFiles.isEmpty()) {
readFileToSet("included", includeFiles, newIncludes);
switchIncludes = true;
}
if (!excludesFile.isEmpty()) {
readFileToSet("excluded", excludesFile, newExcludes);
if (excludeFiles != null && !excludeFiles.isEmpty()) {
readFileToSet("excluded", excludeFiles, newExcludes);
switchExcludes = true;
}
@ -124,12 +149,17 @@ public synchronized void refresh() throws IOException {
// switch the excluded hosts
excludes = newExcludes;
}
} finally {
this.writeLock.unlock();
}
}
@Private
public synchronized void refresh(InputStream inFileInputStream,
public void refresh(InputStream inFileInputStream,
InputStream exFileInputStream) throws IOException {
LOG.info("Refreshing hosts (include/exclude) list");
this.writeLock.lock();
try {
Set<String> newIncludes = new HashSet<String>();
Set<String> newExcludes = new HashSet<String>();
boolean switchIncludes = false;
@ -152,29 +182,56 @@ public synchronized void refresh(InputStream inFileInputStream,
// switch the excluded hosts
excludes = newExcludes;
}
} finally {
this.writeLock.unlock();
}
}
public synchronized Set<String> getHosts() {
public Set<String> getHosts() {
this.readLock.lock();
try {
return includes;
} finally {
this.readLock.unlock();
}
}
public synchronized Set<String> getExcludedHosts() {
public Set<String> getExcludedHosts() {
this.readLock.lock();
try {
return excludes;
} finally {
this.readLock.unlock();
}
}
public synchronized void setIncludesFile(String includesFile) {
public void getHostDetails(Set<String> includes, Set<String> excludes) {
this.readLock.lock();
try {
includes.addAll(this.includes);
excludes.addAll(this.excludes);
} finally {
this.readLock.unlock();
}
}
public void setIncludesFile(String includesFile) {
LOG.info("Setting the includes file to " + includesFile);
this.includesFile = includesFile;
}
public synchronized void setExcludesFile(String excludesFile) {
public void setExcludesFile(String excludesFile) {
LOG.info("Setting the excludes file to " + excludesFile);
this.excludesFile = excludesFile;
}
public synchronized void updateFileNames(String includesFile,
String excludesFile) {
setIncludesFile(includesFile);
setExcludesFile(excludesFile);
public void updateFileNames(String includeFiles, String excludeFiles) {
this.writeLock.lock();
try {
setIncludesFile(includeFiles);
setExcludesFile(excludeFiles);
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -20,6 +20,8 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.*;
@ -96,6 +98,30 @@ public void testHostsFileReader() throws Exception {
assertTrue(hfp.getExcludedHosts().contains("somehost5"));
assertFalse(hfp.getExcludedHosts().contains("host4"));
// test for refreshing hostreader wit new include/exclude host files
String newExcludesFile = HOSTS_TEST_DIR + "/dfs1.exclude";
String newIncludesFile = HOSTS_TEST_DIR + "/dfs1.include";
efw = new FileWriter(newExcludesFile);
ifw = new FileWriter(newIncludesFile);
efw.write("#DFS-Hosts-excluded\n");
efw.write("node1\n");
efw.close();
ifw.write("#Hosts-in-DFS\n");
ifw.write("node2\n");
ifw.close();
hfp.refresh(newIncludesFile, newExcludesFile);
assertTrue(hfp.getExcludedHosts().contains("node1"));
assertTrue(hfp.getHosts().contains("node2"));
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hfp.getHostDetails(hostsList, excludeList);
assertTrue(excludeList.contains("node1"));
assertTrue(hostsList.contains("node2"));
}
/*

View File

@ -183,10 +183,15 @@ private void printConfiguredHosts() {
YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
for (String include : hostsReader.getHosts()) {
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
for (String include : hostsList) {
LOG.debug("include: " + include);
}
for (String exclude : hostsReader.getExcludedHosts()) {
for (String exclude : excludeList) {
LOG.debug("exclude: " + exclude);
}
}
@ -208,7 +213,6 @@ public void refreshNodes(Configuration yarnConf) throws IOException,
private void refreshHostsReader(Configuration yarnConf) throws IOException,
YarnException {
synchronized (hostsReader) {
if (null == yarnConf) {
yarnConf = new YarnConfiguration();
}
@ -218,16 +222,9 @@ private void refreshHostsReader(Configuration yarnConf) throws IOException,
excludesFile =
yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
hostsReader.updateFileNames(includesFile, excludesFile);
hostsReader.refresh(
includesFile.isEmpty() ? null : this.rmContext
.getConfigurationProvider().getConfigurationInputStream(
this.conf, includesFile), excludesFile.isEmpty() ? null
: this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(this.conf, excludesFile));
hostsReader.refresh(includesFile, excludesFile);
printConfiguredHosts();
}
}
private void setDecomissionedNMs() {
Set<String> excludeList = hostsReader.getExcludedHosts();
@ -364,14 +361,14 @@ public void run() {
public boolean isValidNode(String hostName) {
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
.contains(ip))
&& !(excludeList.contains(hostName) || excludeList.contains(ip));
}
}
@Override
public void handle(NodesListManagerEvent event) {
@ -467,17 +464,15 @@ private void updateInactiveNodes() {
}
public boolean isUntrackedNode(String hostName) {
boolean untracked;
String ip = resolver.resolve(hostName);
synchronized (hostsReader) {
Set<String> hostsList = hostsReader.getHosts();
Set<String> excludeList = hostsReader.getExcludedHosts();
untracked = !hostsList.isEmpty() &&
!hostsList.contains(hostName) && !hostsList.contains(ip) &&
!excludeList.contains(hostName) && !excludeList.contains(ip);
}
return untracked;
Set<String> hostsList = new HashSet<String>();
Set<String> excludeList = new HashSet<String>();
hostsReader.getHostDetails(hostsList, excludeList);
return !hostsList.isEmpty() && !hostsList.contains(hostName)
&& !hostsList.contains(ip) && !excludeList.contains(hostName)
&& !excludeList.contains(ip);
}
/**