HDFS-16157. Support configuring DNS record to get list of journal nodes contributed by Leon Gao. (#3284)
* Add DNS resolution for QJM * Add log * Resolve comments * checkstyle * typo
This commit is contained in:
parent
ad54f5195c
commit
b53cae0ffb
@ -645,6 +645,17 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
|
public static final String DFS_NAMENODE_EDITS_PLUGIN_PREFIX = "dfs.namenode.edits.journal-plugin";
|
||||||
public static final String DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY = "dfs.namenode.edits.dir.required";
|
public static final String DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY = "dfs.namenode.edits.dir.required";
|
||||||
public static final String DFS_NAMENODE_EDITS_DIR_DEFAULT = "file:///tmp/hadoop/dfs/name";
|
public static final String DFS_NAMENODE_EDITS_DIR_DEFAULT = "file:///tmp/hadoop/dfs/name";
|
||||||
|
|
||||||
|
public static final String
|
||||||
|
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED =
|
||||||
|
"dfs.namenode.edits.qjournals.resolution-enabled";
|
||||||
|
public static final boolean
|
||||||
|
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED_DEFAULT = false;
|
||||||
|
|
||||||
|
public static final String
|
||||||
|
DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL =
|
||||||
|
"dfs.namenode.edits.qjournals.resolver.impl";
|
||||||
|
|
||||||
public static final String DFS_METRICS_SESSION_ID_KEY =
|
public static final String DFS_METRICS_SESSION_ID_KEY =
|
||||||
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
|
HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY;
|
||||||
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
|
public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals";
|
||||||
|
@ -490,7 +490,7 @@ public static Set<String> getJournalNodeAddresses(
|
|||||||
" to append it with namenodeId");
|
" to append it with namenodeId");
|
||||||
URI uri = new URI(journalsUri);
|
URI uri = new URI(journalsUri);
|
||||||
List<InetSocketAddress> socketAddresses = Util.
|
List<InetSocketAddress> socketAddresses = Util.
|
||||||
getAddressesList(uri);
|
getAddressesList(uri, conf);
|
||||||
for (InetSocketAddress is : socketAddresses) {
|
for (InetSocketAddress is : socketAddresses) {
|
||||||
journalNodeList.add(is.getHostName());
|
journalNodeList.add(is.getHostName());
|
||||||
}
|
}
|
||||||
@ -501,7 +501,7 @@ public static Set<String> getJournalNodeAddresses(
|
|||||||
} else {
|
} else {
|
||||||
URI uri = new URI(journalsUri);
|
URI uri = new URI(journalsUri);
|
||||||
List<InetSocketAddress> socketAddresses = Util.
|
List<InetSocketAddress> socketAddresses = Util.
|
||||||
getAddressesList(uri);
|
getAddressesList(uri, conf);
|
||||||
for (InetSocketAddress is : socketAddresses) {
|
for (InetSocketAddress is : socketAddresses) {
|
||||||
journalNodeList.add(is.getHostName());
|
journalNodeList.add(is.getHostName());
|
||||||
}
|
}
|
||||||
@ -512,7 +512,7 @@ public static Set<String> getJournalNodeAddresses(
|
|||||||
return journalNodeList;
|
return journalNodeList;
|
||||||
} else {
|
} else {
|
||||||
URI uri = new URI(journalsUri);
|
URI uri = new URI(journalsUri);
|
||||||
List<InetSocketAddress> socketAddresses = Util.getAddressesList(uri);
|
List<InetSocketAddress> socketAddresses = Util.getAddressesList(uri, conf);
|
||||||
for (InetSocketAddress is : socketAddresses) {
|
for (InetSocketAddress is : socketAddresses) {
|
||||||
journalNodeList.add(is.getHostName());
|
journalNodeList.add(is.getHostName());
|
||||||
}
|
}
|
||||||
|
@ -414,7 +414,7 @@ static List<AsyncLogger> createLoggers(Configuration conf,
|
|||||||
String nameServiceId)
|
String nameServiceId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
List<AsyncLogger> ret = Lists.newArrayList();
|
List<AsyncLogger> ret = Lists.newArrayList();
|
||||||
List<InetSocketAddress> addrs = Util.getAddressesList(uri);
|
List<InetSocketAddress> addrs = Util.getAddressesList(uri, conf);
|
||||||
if (addrs.size() % 2 == 0) {
|
if (addrs.size() % 2 == 0) {
|
||||||
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
|
LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
|
||||||
"of Journal Nodes specified. This is not recommended!");
|
"of Journal Nodes specified. This is not recommended!");
|
||||||
|
@ -315,7 +315,7 @@ private List<InetSocketAddress> getJournalAddrList(String uriStr) throws
|
|||||||
IOException {
|
IOException {
|
||||||
URI uri = new URI(uriStr);
|
URI uri = new URI(uriStr);
|
||||||
return Util.getLoggerAddresses(uri,
|
return Util.getLoggerAddresses(uri,
|
||||||
Sets.newHashSet(jn.getBoundIpcAddress()));
|
Sets.newHashSet(jn.getBoundIpcAddress()), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
|
private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs,
|
||||||
|
@ -43,6 +43,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
|
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.io.MD5Hash;
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
|
import org.apache.hadoop.net.DomainNameResolver;
|
||||||
|
import org.apache.hadoop.net.DomainNameResolverFactory;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
@ -361,7 +363,7 @@ private static MD5Hash parseMD5Header(HttpURLConnection connection) {
|
|||||||
return (header != null) ? new MD5Hash(header) : null;
|
return (header != null) ? new MD5Hash(header) : null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<InetSocketAddress> getAddressesList(URI uri)
|
public static List<InetSocketAddress> getAddressesList(URI uri, Configuration conf)
|
||||||
throws IOException{
|
throws IOException{
|
||||||
String authority = uri.getAuthority();
|
String authority = uri.getAuthority();
|
||||||
Preconditions.checkArgument(authority != null && !authority.isEmpty(),
|
Preconditions.checkArgument(authority != null && !authority.isEmpty(),
|
||||||
@ -372,8 +374,35 @@ public static List<InetSocketAddress> getAddressesList(URI uri)
|
|||||||
parts[i] = parts[i].trim();
|
parts[i] = parts[i].trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean resolveNeeded = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED_DEFAULT);
|
||||||
|
DomainNameResolver dnr = DomainNameResolverFactory.newInstance(
|
||||||
|
conf,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL);
|
||||||
|
|
||||||
List<InetSocketAddress> addrs = Lists.newArrayList();
|
List<InetSocketAddress> addrs = Lists.newArrayList();
|
||||||
for (String addr : parts) {
|
for (String addr : parts) {
|
||||||
|
if (resolveNeeded) {
|
||||||
|
LOG.info("Resolving journal address: " + addr);
|
||||||
|
InetSocketAddress isa = NetUtils.createSocketAddr(
|
||||||
|
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
|
||||||
|
// Get multiple hostnames from domain name if needed,
|
||||||
|
// for example multiple hosts behind a DNS entry.
|
||||||
|
int port = isa.getPort();
|
||||||
|
// QJM should just use FQDN
|
||||||
|
String[] hostnames = dnr
|
||||||
|
.getAllResolvedHostnameByDomainName(isa.getHostName(), true);
|
||||||
|
if (hostnames.length == 0) {
|
||||||
|
throw new UnknownHostException(addr);
|
||||||
|
}
|
||||||
|
for (String h : hostnames) {
|
||||||
|
addrs.add(NetUtils.createSocketAddr(
|
||||||
|
h + ":" + port,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
InetSocketAddress isa = NetUtils.createSocketAddr(
|
InetSocketAddress isa = NetUtils.createSocketAddr(
|
||||||
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
|
addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT);
|
||||||
if (isa.isUnresolved()) {
|
if (isa.isUnresolved()) {
|
||||||
@ -381,12 +410,13 @@ public static List<InetSocketAddress> getAddressesList(URI uri)
|
|||||||
}
|
}
|
||||||
addrs.add(isa);
|
addrs.add(isa);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return addrs;
|
return addrs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<InetSocketAddress> getLoggerAddresses(URI uri,
|
public static List<InetSocketAddress> getLoggerAddresses(URI uri,
|
||||||
Set<InetSocketAddress> addrsToExclude) throws IOException {
|
Set<InetSocketAddress> addrsToExclude, Configuration conf) throws IOException {
|
||||||
List<InetSocketAddress> addrsList = getAddressesList(uri);
|
List<InetSocketAddress> addrsList = getAddressesList(uri, conf);
|
||||||
addrsList.removeAll(addrsToExclude);
|
addrsList.removeAll(addrsToExclude);
|
||||||
return addrsList;
|
return addrsList;
|
||||||
}
|
}
|
||||||
|
@ -502,6 +502,25 @@
|
|||||||
<value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
|
<value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.edits.qjournals.resolution-enabled</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Determines if the given qjournals address is a domain name which needs to
|
||||||
|
be resolved.
|
||||||
|
This is used by namenode to resolve qjournals.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.edits.qjournals.resolver.impl</name>
|
||||||
|
<value></value>
|
||||||
|
<description>
|
||||||
|
Qjournals resolver implementation used by namenode.
|
||||||
|
Effective with dfs.namenode.edits.qjournals.resolution-enabled on.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.permissions.enabled</name>
|
<name>dfs.permissions.enabled</name>
|
||||||
<value>true</value>
|
<value>true</value>
|
||||||
|
@ -194,6 +194,15 @@ The order in which you set these configurations is unimportant, but the values y
|
|||||||
<value>qjournal://node1.example.com:8485;node2.example.com:8485;node3.example.com:8485/mycluster</value>
|
<value>qjournal://node1.example.com:8485;node2.example.com:8485;node3.example.com:8485/mycluster</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
You can also configure journal nodes by setting up dns round-robin record to avoid hardcoded names:
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.namenode.edits.qjournals.resolution-enabled</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
This will require you to configure multiple IPs behind one dns record on the host level, for example round robin DNS.
|
||||||
|
|
||||||
* **dfs.client.failover.proxy.provider.[nameservice ID]** - the Java class that HDFS clients use to contact the Active NameNode
|
* **dfs.client.failover.proxy.provider.[nameservice ID]** - the Java class that HDFS clients use to contact the Active NameNode
|
||||||
|
|
||||||
Configure the name of the Java class which will be used by the DFS Client to
|
Configure the name of the Java class which will be used by the DFS Client to
|
||||||
|
@ -33,13 +33,17 @@
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
|
import org.apache.hadoop.net.MockDomainNameResolver;
|
||||||
import org.apache.hadoop.util.Lists;
|
import org.apache.hadoop.util.Lists;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -1124,6 +1128,32 @@ public void testSelectViaRpcAfterJNRestart() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetJournalAddressListWithResolution() throws Exception {
|
||||||
|
Configuration configuration = new Configuration();
|
||||||
|
configuration.setBoolean(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_ENABLED, true);
|
||||||
|
configuration.set(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_EDITS_QJOURNALS_RESOLUTION_RESOLVER_IMPL,
|
||||||
|
MockDomainNameResolver.class.getName());
|
||||||
|
|
||||||
|
URI uriWithDomain = URI.create("qjournal://"
|
||||||
|
+ MockDomainNameResolver.DOMAIN + ":1234" + "/testns");
|
||||||
|
List<InetSocketAddress> result = Util.getAddressesList(uriWithDomain, configuration);
|
||||||
|
assertEquals(2, result.size());
|
||||||
|
assertEquals(new InetSocketAddress(MockDomainNameResolver.FQDN_1, 1234), result.get(0));
|
||||||
|
assertEquals(new InetSocketAddress(MockDomainNameResolver.FQDN_2, 1234), result.get(1));
|
||||||
|
|
||||||
|
uriWithDomain = URI.create("qjournal://"
|
||||||
|
+ MockDomainNameResolver.UNKNOW_DOMAIN + ":1234" + "/testns");
|
||||||
|
try{
|
||||||
|
Util.getAddressesList(uriWithDomain, configuration);
|
||||||
|
fail("Should throw unknown host exception.");
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private QuorumJournalManager createSpyingQJM()
|
private QuorumJournalManager createSpyingQJM()
|
||||||
throws IOException, URISyntaxException {
|
throws IOException, URISyntaxException {
|
||||||
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
|
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
|
||||||
|
Loading…
Reference in New Issue
Block a user