HDFS-14118. Support using DNS to resolve nameservices to IP addresses. Contributed by Fengnan Li.
This commit is contained in:
parent
f19c844e75
commit
f7a27cdee4
@ -21,6 +21,8 @@
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
||||
import org.apache.hadoop.net.DomainNameResolver;
|
||||
import org.apache.hadoop.net.DNSDomainNameResolver;
|
||||
|
||||
/**
|
||||
* This class contains constants for configuration keys used
|
||||
@ -393,4 +395,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||
public static final String ZK_RETRY_INTERVAL_MS =
|
||||
ZK_PREFIX + "retry-interval-ms";
|
||||
public static final int ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
|
||||
/** Default domain name resolver for hadoop to use. */
|
||||
public static final String HADOOP_DOMAINNAME_RESOLVER_IMPL =
|
||||
"hadoop.domainname.resolver.impl";
|
||||
public static final Class<? extends DomainNameResolver>
|
||||
HADOOP_DOMAINNAME_RESOLVER_IMPL_DEFAULT =
|
||||
DNSDomainNameResolver.class;
|
||||
}
|
||||
|
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
/**
|
||||
* DNSDomainNameResolver takes one domain name and returns all of the IP
|
||||
* addresses from the underlying DNS service.
|
||||
*/
|
||||
public class DNSDomainNameResolver implements DomainNameResolver {
|
||||
@Override
|
||||
public InetAddress[] getAllByDomainName(String domainName)
|
||||
throws UnknownHostException {
|
||||
return InetAddress.getAllByName(domainName);
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
/**
|
||||
* This interface provides methods for the failover proxy to get IP addresses
|
||||
* of the associated servers (NameNodes, RBF routers etc). Implementations will
|
||||
* use their own service discovery mechanism, DNS, Zookeeper etc
|
||||
*/
|
||||
public interface DomainNameResolver {
|
||||
/**
|
||||
* Takes one domain name and returns its IP addresses based on the actual
|
||||
* service discovery methods.
|
||||
*
|
||||
* @param domainName
|
||||
* @return all IP addresses
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
InetAddress[] getAllByDomainName(String domainName)
|
||||
throws UnknownHostException;
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* This class creates the DomainNameResolver instance based on the config.
|
||||
* It can either create the default resolver for the whole resolving for
|
||||
* hadoop or create individual resolver per nameservice or yarn.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public final class DomainNameResolverFactory {
|
||||
|
||||
private DomainNameResolverFactory() {
|
||||
// Utility classes should not have a public or default constructor
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a domain name resolver to convert the domain name in the config to
|
||||
* the actual IP addresses of the Namenode/Router/RM.
|
||||
*
|
||||
* @param conf Configuration to get the resolver from.
|
||||
* @param uri the url that the resolver will be used against
|
||||
* @param configKey The config key name suffixed with
|
||||
* the nameservice/yarnservice.
|
||||
* @return Domain name resolver.
|
||||
*/
|
||||
public static DomainNameResolver newInstance(
|
||||
Configuration conf, URI uri, String configKey) throws IOException {
|
||||
String host = uri.getHost();
|
||||
String confKeyWithHost = configKey + "." + host;
|
||||
return newInstance(conf, confKeyWithHost);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function gets the instance based on the config.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param configKey config key name.
|
||||
* @return Domain name resolver.
|
||||
* @throws IOException when the class cannot be found or initiated.
|
||||
*/
|
||||
public static DomainNameResolver newInstance(
|
||||
Configuration conf, String configKey) {
|
||||
Class<? extends DomainNameResolver> resolverClass = conf.getClass(
|
||||
configKey,
|
||||
DNSDomainNameResolver.class,
|
||||
DomainNameResolver.class);
|
||||
return ReflectionUtils.newInstance(resolverClass, conf);
|
||||
}
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Network-related classes.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
package org.apache.hadoop.net;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
@ -1,23 +0,0 @@
|
||||
<html>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<body>
|
||||
Network-related classes.
|
||||
</body>
|
||||
</html>
|
@ -3360,4 +3360,14 @@
|
||||
address. (i.e 0.0.0.0)
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hadoop.domainname.resolver.impl</name>
|
||||
<value>org.apache.hadoop.net.DNSDomainNameResolver</value>
|
||||
<description>The implementation of DomainNameResolver used for service (NameNodes,
|
||||
RBF Routers etc) discovery. The default implementation
|
||||
org.apache.hadoop.net.DNSDomainNameResolver returns all IP addresses associated
|
||||
with the input domain name of the services by querying the underlying DNS.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
@ -0,0 +1,68 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This mock resolver class returns the predefined resolving results.
|
||||
* By default it uses a default "test.foo.bar" domain with two IP addresses.
|
||||
*/
|
||||
public class MockDomainNameResolver implements DomainNameResolver {
|
||||
|
||||
public static final String DOMAIN = "test.foo.bar";
|
||||
// This host will be used to mock non-resolvable host
|
||||
public static final String UNKNOW_DOMAIN = "unknown.foo.bar";
|
||||
public static final byte[] BYTE_ADDR_1 = new byte[]{10, 1, 1, 1};
|
||||
public static final byte[] BYTE_ADDR_2 = new byte[]{10, 1, 1, 2};
|
||||
public static final String ADDR_1 = "10.1.1.1";
|
||||
public static final String ADDR_2 = "10.1.1.2";
|
||||
|
||||
/** Internal mapping of domain names and IP addresses. */
|
||||
private Map<String, InetAddress[]> addrs = new TreeMap<>();
|
||||
|
||||
|
||||
public MockDomainNameResolver() {
|
||||
try {
|
||||
InetAddress nn1Address = InetAddress.getByAddress(BYTE_ADDR_1);
|
||||
InetAddress nn2Address = InetAddress.getByAddress(BYTE_ADDR_2);
|
||||
addrs.put(DOMAIN, new InetAddress[]{nn1Address, nn2Address});
|
||||
} catch (UnknownHostException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetAddress[] getAllByDomainName(String domainName)
|
||||
throws UnknownHostException {
|
||||
if (!addrs.containsKey(domainName)) {
|
||||
throw new UnknownHostException(domainName + " is not resolvable");
|
||||
}
|
||||
return addrs.get(domainName);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setAddressMap(Map<String, InetAddress[]> addresses) {
|
||||
this.addrs = addresses;
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.net;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* This class mainly test the MockDomainNameResolver comes working as expected.
|
||||
*/
|
||||
public class TestMockDomainNameResolver {
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
@Rule
|
||||
public final ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
conf = new Configuration();
|
||||
conf.set(CommonConfigurationKeys.HADOOP_DOMAINNAME_RESOLVER_IMPL,
|
||||
MockDomainNameResolver.class.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMockDomainNameResolverCanBeCreated() throws IOException {
|
||||
DomainNameResolver resolver = DomainNameResolverFactory.newInstance(
|
||||
conf, CommonConfigurationKeys.HADOOP_DOMAINNAME_RESOLVER_IMPL);
|
||||
InetAddress[] addrs = resolver.getAllByDomainName(
|
||||
MockDomainNameResolver.DOMAIN);
|
||||
|
||||
assertEquals(2, addrs.length);
|
||||
assertEquals(MockDomainNameResolver.ADDR_1, addrs[0].getHostAddress());
|
||||
assertEquals(MockDomainNameResolver.ADDR_2, addrs[1].getHostAddress());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMockDomainNameResolverCanNotBeCreated()
|
||||
throws UnknownHostException {
|
||||
DomainNameResolver resolver = DomainNameResolverFactory.newInstance(
|
||||
conf, CommonConfigurationKeys.HADOOP_DOMAINNAME_RESOLVER_IMPL);
|
||||
exception.expect(UnknownHostException.class);
|
||||
resolver.getAllByDomainName(
|
||||
MockDomainNameResolver.UNKNOW_DOMAIN);
|
||||
}
|
||||
}
|
@ -288,6 +288,9 @@ interface Failover {
|
||||
int CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
|
||||
String RANDOM_ORDER = PREFIX + "random.order";
|
||||
boolean RANDOM_ORDER_DEFAULT = false;
|
||||
String RESOLVE_ADDRESS_NEEDED_KEY = PREFIX + "resolve-needed";
|
||||
boolean RESOLVE_ADDRESS_NEEDED_DEFAULT = false;
|
||||
String RESOLVE_SERVICE_KEY = PREFIX + "resolver.impl";
|
||||
}
|
||||
|
||||
/** dfs.client.write configuration properties */
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
@ -35,6 +36,8 @@
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||
import org.apache.hadoop.net.DomainNameResolver;
|
||||
import org.apache.hadoop.net.DomainNameResolverFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -176,6 +179,11 @@ protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
|
||||
}
|
||||
|
||||
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
|
||||
try {
|
||||
addressesOfNns = getResolvedAddressesIfNecessary(addressesOfNns, uri);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
for (InetSocketAddress address : addressesOfNns) {
|
||||
proxies.add(new NNProxyInfo<T>(address));
|
||||
}
|
||||
@ -192,6 +200,51 @@ protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
|
||||
return proxies;
|
||||
}
|
||||
|
||||
/**
|
||||
* If resolved is needed: for every domain name in the parameter list,
|
||||
* resolve them into the actual IP addresses.
|
||||
*
|
||||
* @param addressesOfNns The domain name list from config.
|
||||
* @param nameNodeUri The URI of namenode/nameservice.
|
||||
* @return The collection of resolved IP addresses.
|
||||
* @throws IOException If there are issues resolving the addresses.
|
||||
*/
|
||||
Collection<InetSocketAddress> getResolvedAddressesIfNecessary(
|
||||
Collection<InetSocketAddress> addressesOfNns, URI nameNodeUri)
|
||||
throws IOException {
|
||||
// 'host' here is usually the ID of the nameservice when address
|
||||
// resolving is needed.
|
||||
String host = nameNodeUri.getHost();
|
||||
String configKeyWithHost =
|
||||
HdfsClientConfigKeys.Failover.RESOLVE_ADDRESS_NEEDED_KEY + "." + host;
|
||||
boolean resolveNeeded = conf.getBoolean(configKeyWithHost,
|
||||
HdfsClientConfigKeys.Failover.RESOLVE_ADDRESS_NEEDED_DEFAULT);
|
||||
if (!resolveNeeded) {
|
||||
// Early return is no resolve is necessary
|
||||
return addressesOfNns;
|
||||
}
|
||||
|
||||
Collection<InetSocketAddress> addressOfResolvedNns = new ArrayList<>();
|
||||
DomainNameResolver dnr = DomainNameResolverFactory.newInstance(
|
||||
conf, nameNodeUri, HdfsClientConfigKeys.Failover.RESOLVE_SERVICE_KEY);
|
||||
// If the address needs to be resolved, get all of the IP addresses
|
||||
// from this address and pass them into the proxy
|
||||
LOG.info("Namenode domain name will be resolved with {}",
|
||||
dnr.getClass().getName());
|
||||
for (InetSocketAddress address : addressesOfNns) {
|
||||
InetAddress[] resolvedAddresses = dnr.getAllByDomainName(
|
||||
address.getHostName());
|
||||
int port = address.getPort();
|
||||
for (InetAddress raddress : resolvedAddresses) {
|
||||
InetSocketAddress resolvedAddress = new InetSocketAddress(
|
||||
raddress, port);
|
||||
addressOfResolvedNns.add(resolvedAddress);
|
||||
}
|
||||
}
|
||||
|
||||
return addressOfResolvedNns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether random order is configured for failover proxy provider
|
||||
* for the namenode/nameservice.
|
||||
|
@ -20,17 +20,21 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.net.MockDomainNameResolver;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.slf4j.event.Level;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
@ -40,6 +44,7 @@
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
@ -54,6 +59,7 @@ public class TestConfiguredFailoverProxyProvider {
|
||||
private int rpcPort = 8020;
|
||||
private URI ns1Uri;
|
||||
private URI ns2Uri;
|
||||
private URI ns3Uri;
|
||||
private String ns1;
|
||||
private String ns1nn1Hostname = "machine1.foo.bar";
|
||||
private InetSocketAddress ns1nn1 =
|
||||
@ -71,8 +77,12 @@ public class TestConfiguredFailoverProxyProvider {
|
||||
private String ns2nn3Hostname = "router3.foo.bar";
|
||||
private InetSocketAddress ns2nn3 =
|
||||
new InetSocketAddress(ns2nn3Hostname, rpcPort);
|
||||
private String ns3;
|
||||
private static final int NUM_ITERATIONS = 50;
|
||||
|
||||
@Rule
|
||||
public final ExpectedException exception = ExpectedException.none();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception {
|
||||
GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE);
|
||||
@ -120,10 +130,41 @@ public void setup() throws URISyntaxException {
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns2,
|
||||
true);
|
||||
|
||||
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns1 + "," + ns2);
|
||||
ns3 = "mycluster-3-" + Time.monotonicNow();
|
||||
ns3Uri = new URI("hdfs://" + ns3);
|
||||
|
||||
conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES,
|
||||
String.join(",", ns1, ns2, ns3));
|
||||
conf.set("fs.defaultFS", "hdfs://" + ns1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add more DNS related settings to the passed in configuration.
|
||||
* @param config Configuration file to add settings to.
|
||||
*/
|
||||
private void addDNSSettings(Configuration config, boolean hostResolvable) {
|
||||
config.set(
|
||||
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns3, "nn");
|
||||
String domain = hostResolvable
|
||||
? MockDomainNameResolver.DOMAIN
|
||||
: MockDomainNameResolver.UNKNOW_DOMAIN;
|
||||
config.set(
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns3 + ".nn",
|
||||
domain + ":" + rpcPort);
|
||||
config.set(
|
||||
HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ns3,
|
||||
ConfiguredFailoverProxyProvider.class.getName());
|
||||
config.setBoolean(
|
||||
HdfsClientConfigKeys.Failover.RESOLVE_ADDRESS_NEEDED_KEY + "." + ns3,
|
||||
true);
|
||||
config.set(
|
||||
HdfsClientConfigKeys.Failover.RESOLVE_SERVICE_KEY + "." + ns3,
|
||||
MockDomainNameResolver.class.getName());
|
||||
config.setBoolean(
|
||||
HdfsClientConfigKeys.Failover.RANDOM_ORDER + "." + ns3,
|
||||
true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests getProxy with random.order configuration set to false.
|
||||
* This expects the proxy order to be consistent every time a new
|
||||
@ -209,6 +250,98 @@ public void testRandomGetProxy() throws Exception {
|
||||
nn1Count.get() + nn2Count.get() + nn3Count.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResolveDomainNameUsingDNS() throws Exception {
|
||||
Configuration dnsConf = new Configuration(conf);
|
||||
addDNSSettings(dnsConf, true);
|
||||
|
||||
// Mock ClientProtocol
|
||||
Map<InetSocketAddress, ClientProtocol> proxyMap = new HashMap<>();
|
||||
final AtomicInteger nn1Count = addClientMock(
|
||||
MockDomainNameResolver.BYTE_ADDR_1, proxyMap);
|
||||
final AtomicInteger nn2Count = addClientMock(
|
||||
MockDomainNameResolver.BYTE_ADDR_2, proxyMap);
|
||||
|
||||
// Get a client multiple times
|
||||
final Map<String, AtomicInteger> proxyResults = new HashMap<>();
|
||||
for (int i = 0; i < NUM_ITERATIONS; i++) {
|
||||
@SuppressWarnings("resource")
|
||||
ConfiguredFailoverProxyProvider<ClientProtocol> provider =
|
||||
new ConfiguredFailoverProxyProvider<>(
|
||||
dnsConf, ns3Uri, ClientProtocol.class, createFactory(proxyMap));
|
||||
ClientProtocol proxy = provider.getProxy().proxy;
|
||||
String proxyAddress = provider.getProxy().proxyInfo;
|
||||
|
||||
if (proxyResults.containsKey(proxyAddress)) {
|
||||
proxyResults.get(proxyAddress).incrementAndGet();
|
||||
} else {
|
||||
proxyResults.put(proxyAddress, new AtomicInteger(1));
|
||||
}
|
||||
proxy.getStats();
|
||||
}
|
||||
|
||||
// Check we got the proper addresses
|
||||
assertEquals(2, proxyResults.size());
|
||||
assertTrue(
|
||||
"nn1 wasn't returned: " + proxyResults,
|
||||
proxyResults.containsKey(
|
||||
"/" + MockDomainNameResolver.ADDR_1 + ":8020"));
|
||||
assertTrue(
|
||||
"nn2 wasn't returned: " + proxyResults,
|
||||
proxyResults.containsKey(
|
||||
"/" + MockDomainNameResolver.ADDR_2 + ":8020"));
|
||||
|
||||
// Check that the Namenodes were invoked
|
||||
assertEquals(NUM_ITERATIONS, nn1Count.get() + nn2Count.get());
|
||||
assertTrue("nn1 was selected too much:" + nn1Count.get(),
|
||||
nn1Count.get() < NUM_ITERATIONS);
|
||||
assertTrue("nn1 should have been selected: " + nn1Count.get(),
|
||||
nn1Count.get() > 0);
|
||||
assertTrue("nn2 was selected too much:" + nn2Count.get(),
|
||||
nn2Count.get() < NUM_ITERATIONS);
|
||||
assertTrue(
|
||||
"nn2 should have been selected: " + nn2Count.get(),
|
||||
nn2Count.get() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResolveDomainNameUsingDNSUnknownHost() throws Exception {
|
||||
Configuration dnsConf = new Configuration(conf);
|
||||
addDNSSettings(dnsConf, false);
|
||||
|
||||
Map<InetSocketAddress, ClientProtocol> proxyMap = new HashMap<>();
|
||||
exception.expect(RuntimeException.class);
|
||||
ConfiguredFailoverProxyProvider<ClientProtocol> provider =
|
||||
new ConfiguredFailoverProxyProvider<>(
|
||||
dnsConf, ns3Uri, ClientProtocol.class, createFactory(proxyMap));
|
||||
|
||||
assertNull("failover proxy cannot be created due to unknownhost",
|
||||
provider);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a ClientProtocol mock for the proxy.
|
||||
* @param addr IP address for the destination.
|
||||
* @param proxyMap Map containing the client for each target address.
|
||||
* @return The counter for the number of calls to this target.
|
||||
* @throws Exception If the client cannot be created.
|
||||
*/
|
||||
private AtomicInteger addClientMock(
|
||||
byte[] addr, Map<InetSocketAddress, ClientProtocol> proxyMap)
|
||||
throws Exception {
|
||||
|
||||
final AtomicInteger counter = new AtomicInteger(0);
|
||||
InetAddress inetAddr = InetAddress.getByAddress(addr);
|
||||
InetSocketAddress inetSockerAddr =
|
||||
new InetSocketAddress(inetAddr, rpcPort);
|
||||
|
||||
final ClientProtocol cpMock = mock(ClientProtocol.class);
|
||||
when(cpMock.getStats()).thenAnswer(createAnswer(counter, 1));
|
||||
proxyMap.put(inetSockerAddr, cpMock);
|
||||
|
||||
return counter;
|
||||
}
|
||||
|
||||
/**
|
||||
* createAnswer creates an Answer for using with the ClientProtocol mocks.
|
||||
* @param counter counter to increment
|
||||
|
@ -3728,13 +3728,42 @@
|
||||
<value>false</value>
|
||||
<description>
|
||||
Determines if the failover proxies are picked in random order instead of the
|
||||
configured order. The prefix can be used with an optional nameservice ID
|
||||
configured order. Random order may be enabled for better load balancing
|
||||
or to avoid always hitting failed ones first if the failed ones appear in the
|
||||
beginning of the configured or resolved list.
|
||||
For example, In the case of multiple RBF routers or ObserverNameNodes,
|
||||
it is recommended to be turned on for load balancing.
|
||||
The config name can be extended with an optional nameservice ID
|
||||
(of form dfs.client.failover.random.order[.nameservice]) in case multiple
|
||||
nameservices exist and random order should be enabled for specific
|
||||
nameservices.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.failover.resolve-needed</name>
|
||||
<value>false</value>
|
||||
<description>
|
||||
Determines if the given nameservice address is a domain name which needs to
|
||||
be resolved (using the resolver configured by dfs.client.failover.resolver-impl).
|
||||
This adds a transparency layer in the client so physical server address
|
||||
can change without changing the client. The config name can be extended with
|
||||
an optional nameservice ID (of form dfs.client.failover.resolve-needed[.nameservice])
|
||||
to configure specific nameservices when multiple nameservices exist.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.failover.resolver.impl</name>
|
||||
<value>org.apache.hadoop.net.DNSDomainNameResolver</value>
|
||||
<description>
|
||||
Determines what class to use to resolve nameservice name to specific machine
|
||||
address(es). The config name can be extended with an optional nameservice ID
|
||||
(of form dfs.client.failover.resolver.impl[.nameservice]) to configure
|
||||
specific nameservices when multiple nameservices exist.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.client.key.provider.cache.expiry</name>
|
||||
<value>864000000</value>
|
||||
|
Loading…
Reference in New Issue
Block a user