YARN-9332. RackResolver tool should accept multiple hosts. Contributed by Lantao Jin.

This commit is contained in:
Weiwei Yang 2019-03-02 23:19:17 +08:00 committed by Akira Ajisaka
parent b18c1c22ea
commit e20b5ef52c
No known key found for this signature in database
GPG Key ID: C1EDBB9CA400FD50
2 changed files with 126 additions and 8 deletions

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.google.common.base.Strings;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@ -86,6 +88,20 @@ public static Node resolve(Configuration conf, String hostName) {
return coreResolve(hostName);
}
/**
* Utility method for getting a list of hostname resolved to a list of node
* in the network topology. This method initializes the class with the
* right resolver implementation.
* @param conf
* @param hostNames
* @return nodes {@link Node} after resolving the hostnames
*/
public static List<Node> resolve(
Configuration conf, List<String> hostNames) {
init(conf);
return coreResolve(hostNames);
}
/**
* Utility method for getting a hostname resolved to a node in the
* network topology. This method doesn't initialize the class.
@ -100,18 +116,50 @@ public static Node resolve(String hostName) {
return coreResolve(hostName);
}
/**
* Utility method for getting a list of hostname resolved to a list of node
* in the network topology. This method doesn't initialize the class.
* Call {@link #init(Configuration)} explicitly.
* @param hostNames
* @return nodes {@link Node} after resolving the hostnames
*/
public static List<Node> resolve(List<String> hostNames) {
if (!initCalled) {
throw new IllegalStateException("RackResolver class " +
"not yet initialized");
}
return coreResolve(hostNames);
}
private static Node coreResolve(String hostName) {
List <String> tmpList = Collections.singletonList(hostName);
List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
String rName = NetworkTopology.DEFAULT_RACK;
if (rNameList == null || rNameList.get(0) == null) {
LOG.debug("Could not resolve {}. Falling back to {}", hostName,
NetworkTopology.DEFAULT_RACK);
return coreResolve(tmpList).get(0);
}
private static List<Node> coreResolve(List<String> hostNames) {
List<Node> nodes = new ArrayList<Node>(hostNames.size());
List<String> rNameList = dnsToSwitchMapping.resolve(hostNames);
if (rNameList == null || rNameList.isEmpty()) {
for (String hostName : hostNames) {
nodes.add(new NodeBase(hostName, NetworkTopology.DEFAULT_RACK));
}
LOG.info("Got an error when resolve hostNames. Falling back to "
+ NetworkTopology.DEFAULT_RACK + " for all.");
} else {
rName = rNameList.get(0);
LOG.debug("Resolved {} to {}", hostName, rName);
for (int i = 0; i < hostNames.size(); i++) {
if (Strings.isNullOrEmpty(rNameList.get(i))) {
// fallback to use default rack
nodes.add(new NodeBase(hostNames.get(i),
NetworkTopology.DEFAULT_RACK));
LOG.debug("Could not resolve {}. Falling back to {}",
hostNames.get(i), NetworkTopology.DEFAULT_RACK);
} else {
nodes.add(new NodeBase(hostNames.get(i), rNameList.get(i)));
LOG.debug("Resolved {} to {}", hostNames.get(i), rNameList.get(i));
}
}
}
return new NodeBase(hostName, rName);
return nodes;
}
/**
@ -122,4 +170,14 @@ private static Node coreResolve(String hostName) {
static DNSToSwitchMapping getDnsToSwitchMapping() {
return dnsToSwitchMapping;
}
/**
* Only used by tests.
*/
@Private
@VisibleForTesting
static void reset() {
initCalled = false;
dnsToSwitchMapping = null;
}
}

View File

@ -21,6 +21,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@ -31,6 +32,7 @@
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestRackResolver {
@ -38,6 +40,10 @@ public class TestRackResolver {
private static Log LOG = LogFactory.getLog(TestRackResolver.class);
private static final String invalidHost = "invalidHost";
@Before
public void setUp() {
RackResolver.reset();
}
public static final class MyResolver implements DNSToSwitchMapping {
@ -81,6 +87,44 @@ public void reloadCachedMappings(List<String> names) {
}
}
/**
* This class is to test the resolve method which accepts a list of hosts
* in RackResolver.
*/
public static final class MultipleResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> hostList) {
List<String> returnList = new ArrayList<String>();
if (hostList.isEmpty()) {
return returnList;
}
for (String host : hostList) {
if (host.equals(invalidHost)) {
// Simulate condition where resolving host returns empty string
returnList.add("");
}
LOG.info("Received resolve request for " + host);
if (host.startsWith("host")) {
returnList.add("/" + host.replace("host", "rack"));
}
// I should not be reached again as RackResolver is supposed to do
// caching.
}
Assert.assertEquals(returnList.size(), hostList.size());
return returnList;
}
@Override
public void reloadCachedMappings() {
// nothing to do here, since RawScriptBasedMapping has no cache.
}
@Override
public void reloadCachedMappings(List<String> names) {
}
}
@Test
public void testCaching() {
Configuration conf = new Configuration();
@ -102,4 +146,20 @@ public void testCaching() {
Assert.assertEquals(NetworkTopology.DEFAULT_RACK, node.getNetworkLocation());
}
@Test
public void testMultipleHosts() {
Configuration conf = new Configuration();
conf.setClass(
CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MultipleResolver.class,
DNSToSwitchMapping.class);
RackResolver.init(conf);
List<Node> nodes = RackResolver.resolve(
Arrays.asList("host1", invalidHost, "host2"));
Assert.assertEquals("/rack1", nodes.get(0).getNetworkLocation());
Assert.assertEquals(NetworkTopology.DEFAULT_RACK,
nodes.get(1).getNetworkLocation());
Assert.assertEquals("/rack2", nodes.get(2).getNetworkLocation());
}
}