HDFS-17333. DFSClient supports lazy resolution from hostname to IP. (#6430)
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
a0ce2170db
commit
a6aa2925fb
@ -163,6 +163,10 @@ public static InetSocketAddress createSocketAddr(String target) {
|
|||||||
return createSocketAddr(target, -1);
|
return createSocketAddr(target, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static InetSocketAddress createSocketAddrUnresolved(String target) {
|
||||||
|
return createSocketAddr(target, -1, null, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Util method to build socket addr from either.
|
* Util method to build socket addr from either.
|
||||||
* {@literal <host>}
|
* {@literal <host>}
|
||||||
@ -219,6 +223,12 @@ public static InetSocketAddress createSocketAddr(String target,
|
|||||||
int defaultPort,
|
int defaultPort,
|
||||||
String configName,
|
String configName,
|
||||||
boolean useCacheIfPresent) {
|
boolean useCacheIfPresent) {
|
||||||
|
return createSocketAddr(target, defaultPort, configName, useCacheIfPresent, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static InetSocketAddress createSocketAddr(
|
||||||
|
String target, int defaultPort, String configName,
|
||||||
|
boolean useCacheIfPresent, boolean isResolved) {
|
||||||
String helpText = "";
|
String helpText = "";
|
||||||
if (configName != null) {
|
if (configName != null) {
|
||||||
helpText = " (configuration property '" + configName + "')";
|
helpText = " (configuration property '" + configName + "')";
|
||||||
@ -244,8 +254,11 @@ public static InetSocketAddress createSocketAddr(String target,
|
|||||||
"Does not contain a valid host:port authority: " + target + helpText
|
"Does not contain a valid host:port authority: " + target + helpText
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
if (isResolved) {
|
||||||
return createSocketAddrForHost(host, port);
|
return createSocketAddrForHost(host, port);
|
||||||
}
|
}
|
||||||
|
return InetSocketAddress.createUnresolved(host, port);
|
||||||
|
}
|
||||||
|
|
||||||
private static final long URI_CACHE_SIZE_DEFAULT = 1000;
|
private static final long URI_CACHE_SIZE_DEFAULT = 1000;
|
||||||
private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
|
private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
|
||||||
|
@ -107,6 +107,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED_DEFAULT;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSUtilClient {
|
public class DFSUtilClient {
|
||||||
@ -530,12 +532,19 @@ public static Map<String, InetSocketAddress> getAddressesForNameserviceId(
|
|||||||
String suffix = concatSuffixes(nsId, nnId);
|
String suffix = concatSuffixes(nsId, nnId);
|
||||||
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
|
String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
|
||||||
if (address != null) {
|
if (address != null) {
|
||||||
InetSocketAddress isa = NetUtils.createSocketAddr(address);
|
InetSocketAddress isa = null;
|
||||||
|
// There is no need to resolve host->ip in advance.
|
||||||
|
// Delay the resolution until the host is used.
|
||||||
|
if (conf.getBoolean(DFS_CLIENT_LAZY_RESOLVED, DFS_CLIENT_LAZY_RESOLVED_DEFAULT)) {
|
||||||
|
isa = NetUtils.createSocketAddrUnresolved(address);
|
||||||
|
}else {
|
||||||
|
isa = NetUtils.createSocketAddr(address);
|
||||||
if (isa.isUnresolved()) {
|
if (isa.isUnresolved()) {
|
||||||
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
|
LOG.warn("Namenode for {} remains unresolved for ID {}. Check your "
|
||||||
+ "hdfs-site.xml file to ensure namenodes are configured "
|
+ "hdfs-site.xml file to ensure namenodes are configured "
|
||||||
+ "properly.", nsId, nnId);
|
+ "properly.", nsId, nnId);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
ret.put(nnId, isa);
|
ret.put(nnId, isa);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -397,6 +397,8 @@ interface Failover {
|
|||||||
String RESOLVE_SERVICE_KEY = PREFIX + "resolver.impl";
|
String RESOLVE_SERVICE_KEY = PREFIX + "resolver.impl";
|
||||||
String RESOLVE_ADDRESS_TO_FQDN = PREFIX + "resolver.useFQDN";
|
String RESOLVE_ADDRESS_TO_FQDN = PREFIX + "resolver.useFQDN";
|
||||||
boolean RESOLVE_ADDRESS_TO_FQDN_DEFAULT = true;
|
boolean RESOLVE_ADDRESS_TO_FQDN_DEFAULT = true;
|
||||||
|
String DFS_CLIENT_LAZY_RESOLVED = PREFIX + "lazy.resolved";
|
||||||
|
boolean DFS_CLIENT_LAZY_RESOLVED_DEFAULT = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** dfs.client.write configuration properties */
|
/** dfs.client.write configuration properties */
|
||||||
|
@ -37,10 +37,14 @@
|
|||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
import org.apache.hadoop.net.DomainNameResolver;
|
import org.apache.hadoop.net.DomainNameResolver;
|
||||||
import org.apache.hadoop.net.DomainNameResolverFactory;
|
import org.apache.hadoop.net.DomainNameResolverFactory;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED;
|
||||||
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED_DEFAULT;
|
||||||
|
|
||||||
public abstract class AbstractNNFailoverProxyProvider<T> implements
|
public abstract class AbstractNNFailoverProxyProvider<T> implements
|
||||||
FailoverProxyProvider <T> {
|
FailoverProxyProvider <T> {
|
||||||
protected static final Logger LOG =
|
protected static final Logger LOG =
|
||||||
@ -138,6 +142,10 @@ public void setCachedState(HAServiceState state) {
|
|||||||
public HAServiceState getCachedState() {
|
public HAServiceState getCachedState() {
|
||||||
return cachedState;
|
return cachedState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAddress(InetSocketAddress address) {
|
||||||
|
this.address = address;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -152,6 +160,24 @@ protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
|
|||||||
if (pi.proxy == null) {
|
if (pi.proxy == null) {
|
||||||
assert pi.getAddress() != null : "Proxy address is null";
|
assert pi.getAddress() != null : "Proxy address is null";
|
||||||
try {
|
try {
|
||||||
|
InetSocketAddress address = pi.getAddress();
|
||||||
|
// If the host is not resolved to IP and lazy.resolved=true,
|
||||||
|
// the host needs to be resolved.
|
||||||
|
if (address.isUnresolved()) {
|
||||||
|
if (conf.getBoolean(DFS_CLIENT_LAZY_RESOLVED, DFS_CLIENT_LAZY_RESOLVED_DEFAULT)) {
|
||||||
|
InetSocketAddress isa =
|
||||||
|
NetUtils.createSocketAddrForHost(address.getHostName(), address.getPort());
|
||||||
|
if (isa.isUnresolved()) {
|
||||||
|
LOG.warn("Can not resolve host {}, check your hdfs-site.xml file " +
|
||||||
|
"to ensure host are configured correctly.", address.getHostName());
|
||||||
|
}
|
||||||
|
pi.setAddress(isa);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Lazy resolve host {} -> {}, when create proxy if needed.",
|
||||||
|
address.toString(), pi.getAddress().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
pi.proxy = factory.createProxy(conf,
|
pi.proxy = factory.createProxy(conf,
|
||||||
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
|
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -60,6 +61,7 @@ public class TestConfiguredFailoverProxyProvider {
|
|||||||
private URI ns1Uri;
|
private URI ns1Uri;
|
||||||
private URI ns2Uri;
|
private URI ns2Uri;
|
||||||
private URI ns3Uri;
|
private URI ns3Uri;
|
||||||
|
private URI ns4Uri;
|
||||||
private String ns1;
|
private String ns1;
|
||||||
private String ns1nn1Hostname = "machine1.foo.bar";
|
private String ns1nn1Hostname = "machine1.foo.bar";
|
||||||
private InetSocketAddress ns1nn1 =
|
private InetSocketAddress ns1nn1 =
|
||||||
@ -79,6 +81,9 @@ public class TestConfiguredFailoverProxyProvider {
|
|||||||
new InetSocketAddress(ns2nn3Hostname, rpcPort);
|
new InetSocketAddress(ns2nn3Hostname, rpcPort);
|
||||||
private String ns3;
|
private String ns3;
|
||||||
private static final int NUM_ITERATIONS = 50;
|
private static final int NUM_ITERATIONS = 50;
|
||||||
|
private String ns4;
|
||||||
|
private String ns4nn1Hostname = "localhost";
|
||||||
|
private String ns4nn2Hostname = "127.0.0.1";
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException exception = ExpectedException.none();
|
public final ExpectedException exception = ExpectedException.none();
|
||||||
@ -133,8 +138,11 @@ public void setup() throws URISyntaxException {
|
|||||||
ns3 = "mycluster-3-" + Time.monotonicNow();
|
ns3 = "mycluster-3-" + Time.monotonicNow();
|
||||||
ns3Uri = new URI("hdfs://" + ns3);
|
ns3Uri = new URI("hdfs://" + ns3);
|
||||||
|
|
||||||
|
ns4 = "mycluster-4-" + Time.monotonicNow();
|
||||||
|
ns4Uri = new URI("hdfs://" + ns4);
|
||||||
|
|
||||||
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES,
|
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES,
|
||||||
String.join(",", ns1, ns2, ns3));
|
String.join(",", ns1, ns2, ns3, ns4));
|
||||||
conf.set("fs.defaultFS", "hdfs://" + ns1);
|
conf.set("fs.defaultFS", "hdfs://" + ns1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,6 +178,33 @@ private void addDNSSettings(Configuration config,
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add more LazyResolved related settings to the passed in configuration.
|
||||||
|
*/
|
||||||
|
private void addLazyResolvedSettings(Configuration config, boolean isLazy) {
|
||||||
|
config.set(
|
||||||
|
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns4,
|
||||||
|
"nn1,nn2,nn3");
|
||||||
|
config.set(
|
||||||
|
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns4 + ".nn1",
|
||||||
|
ns4nn1Hostname + ":" + rpcPort);
|
||||||
|
config.set(
|
||||||
|
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns4 + ".nn2",
|
||||||
|
ns4nn2Hostname + ":" + rpcPort);
|
||||||
|
config.set(
|
||||||
|
HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns4,
|
||||||
|
ConfiguredFailoverProxyProvider.class.getName());
|
||||||
|
if (isLazy) {
|
||||||
|
// Set dfs.client.failover.lazy.resolved=true (default false).
|
||||||
|
config.setBoolean(
|
||||||
|
HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
config.setBoolean(
|
||||||
|
HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns4,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests getProxy with random.order configuration set to false.
|
* Tests getProxy with random.order configuration set to false.
|
||||||
* This expects the proxy order to be consistent every time a new
|
* This expects the proxy order to be consistent every time a new
|
||||||
@ -330,6 +365,51 @@ public void testResolveDomainNameUsingDNS() throws Exception {
|
|||||||
testResolveDomainNameUsingDNS(true);
|
testResolveDomainNameUsingDNS(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyResolved() throws IOException {
|
||||||
|
// Not lazy resolved.
|
||||||
|
testLazyResolved(false);
|
||||||
|
// Lazy resolved.
|
||||||
|
testLazyResolved(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testLazyResolved(boolean isLazy) throws IOException {
|
||||||
|
Configuration lazyResolvedConf = new Configuration(conf);
|
||||||
|
addLazyResolvedSettings(lazyResolvedConf, isLazy);
|
||||||
|
Map<InetSocketAddress, ClientProtocol> proxyMap = new HashMap<>();
|
||||||
|
|
||||||
|
InetSocketAddress ns4nn1 = new InetSocketAddress(ns4nn1Hostname, rpcPort);
|
||||||
|
InetSocketAddress ns4nn2 = new InetSocketAddress(ns4nn2Hostname, rpcPort);
|
||||||
|
|
||||||
|
// Mock ClientProtocol
|
||||||
|
final ClientProtocol nn1Mock = mock(ClientProtocol.class);
|
||||||
|
when(nn1Mock.getStats()).thenReturn(new long[]{0});
|
||||||
|
proxyMap.put(ns4nn1, nn1Mock);
|
||||||
|
|
||||||
|
final ClientProtocol nn2Mock = mock(ClientProtocol.class);
|
||||||
|
when(nn1Mock.getStats()).thenReturn(new long[]{0});
|
||||||
|
proxyMap.put(ns4nn2, nn2Mock);
|
||||||
|
|
||||||
|
ConfiguredFailoverProxyProvider<ClientProtocol> provider =
|
||||||
|
new ConfiguredFailoverProxyProvider<>(lazyResolvedConf, ns4Uri,
|
||||||
|
ClientProtocol.class, createFactory(proxyMap));
|
||||||
|
assertEquals(2, provider.proxies.size());
|
||||||
|
for (AbstractNNFailoverProxyProvider.NNProxyInfo proxyInfo : provider.proxies) {
|
||||||
|
if (isLazy) {
|
||||||
|
// If lazy resolution is used, and the proxy is not used at this time,
|
||||||
|
// so the host is not resolved.
|
||||||
|
assertTrue(proxyInfo.getAddress().isUnresolved());
|
||||||
|
}else {
|
||||||
|
assertFalse(proxyInfo.getAddress().isUnresolved());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the host is used to process the request, the host is resolved.
|
||||||
|
ClientProtocol proxy = provider.getProxy().proxy;
|
||||||
|
proxy.getStats();
|
||||||
|
assertFalse(provider.proxies.get(0).getAddress().isUnresolved());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testResolveDomainNameUsingDNSUnknownHost() throws Exception {
|
public void testResolveDomainNameUsingDNSUnknownHost() throws Exception {
|
||||||
Configuration dnsConf = new Configuration(conf);
|
Configuration dnsConf = new Configuration(conf);
|
||||||
|
@ -4469,6 +4469,15 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.client.failover.lazy.resolved</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Used to enable lazy resolution of host->ip. If the value is true,
|
||||||
|
the host will only be resolved only before Dfsclient needs to request the host.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.client.key.provider.cache.expiry</name>
|
<name>dfs.client.key.provider.cache.expiry</name>
|
||||||
<value>864000000</value>
|
<value>864000000</value>
|
||||||
|
@ -1159,4 +1159,51 @@ public void testGetTransferRateInBytesPerSecond() {
|
|||||||
assertEquals(102_400_000,
|
assertEquals(102_400_000,
|
||||||
DFSUtil.getTransferRateInBytesPerSecond(512_000_000L, 5_000_000_000L));
|
DFSUtil.getTransferRateInBytesPerSecond(512_000_000L, 5_000_000_000L));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLazyResolved() {
|
||||||
|
// Not lazy resolved.
|
||||||
|
testLazyResolved(false);
|
||||||
|
// Lazy resolved.
|
||||||
|
testLazyResolved(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testLazyResolved(boolean isLazy) {
|
||||||
|
final String ns1Nn1 = "localhost:8020";
|
||||||
|
final String ns1Nn2 = "127.0.0.1:8020";
|
||||||
|
final String ns2Nn1 = "127.0.0.2:8020";
|
||||||
|
final String ns2Nn2 = "127.0.0.3:8020";
|
||||||
|
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
conf.set(DFS_NAMESERVICES, "ns1,ns2");
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"), "nn1,nn2");
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn1"), ns1Nn1);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), ns1Nn2);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns2"), "nn1,nn2");
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "nn1"), ns2Nn1);
|
||||||
|
conf.set(DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns2", "nn2"), ns2Nn2);
|
||||||
|
|
||||||
|
conf.setBoolean(HdfsClientConfigKeys.Failover.DFS_CLIENT_LAZY_RESOLVED, isLazy);
|
||||||
|
|
||||||
|
Map<String, Map<String, InetSocketAddress>> addresses =
|
||||||
|
DFSUtilClient.getAddresses(conf, null, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||||
|
|
||||||
|
addresses.forEach((ns, inetSocketAddressMap) -> {
|
||||||
|
inetSocketAddressMap.forEach((nn, inetSocketAddress) -> {
|
||||||
|
if (isLazy) {
|
||||||
|
// Lazy resolved. There is no need to change host->ip in advance.
|
||||||
|
assertTrue(inetSocketAddress.isUnresolved());
|
||||||
|
}else {
|
||||||
|
// Need resolve all host->ip.
|
||||||
|
assertFalse(inetSocketAddress.isUnresolved());
|
||||||
|
}
|
||||||
|
assertEquals(inetSocketAddress.getPort(), 8020);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user