HDFS-12486. GetConf to get journalnodeslist. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
e1b32e0959
commit
cda3378659
@ -406,7 +406,7 @@ static Map<String, InetSocketAddress> getAddressesForNameserviceId(
|
||||
* @param keys list of keys in the order of preference
|
||||
* @return value of the key or default if a key was not found in configuration
|
||||
*/
|
||||
private static String getConfValue(String defaultValue, String keySuffix,
|
||||
public static String getConfValue(String defaultValue, String keySuffix,
|
||||
Configuration conf, String... keys) {
|
||||
String value = null;
|
||||
for (String key : keys) {
|
||||
|
@ -31,6 +31,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
|
||||
@ -44,6 +45,8 @@
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@ -74,6 +77,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
@ -453,6 +457,85 @@ public static Set<String> getAllNnPrincipals(Configuration conf) throws IOExcept
|
||||
return principals;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of Journalnode addresses from the configuration.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return list of journalnode host names
|
||||
* @throws URISyntaxException
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Set<String> getJournalNodeAddresses(
|
||||
Configuration conf) throws URISyntaxException, IOException {
|
||||
Set<String> journalNodeList = new HashSet<>();
|
||||
String journalsUri = "";
|
||||
try {
|
||||
journalsUri = conf.get(DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
|
||||
if (journalsUri == null) {
|
||||
Collection<String> nameserviceIds = DFSUtilClient.
|
||||
getNameServiceIds(conf);
|
||||
for (String nsId : nameserviceIds) {
|
||||
journalsUri = DFSUtilClient.getConfValue(
|
||||
null, nsId, conf, DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
|
||||
if (journalsUri == null) {
|
||||
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
||||
for (String nnId : nnIds) {
|
||||
String suffix = DFSUtilClient.concatSuffixes(nsId, nnId);
|
||||
journalsUri = DFSUtilClient.getConfValue(
|
||||
null, suffix, conf, DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
|
||||
if (journalsUri == null ||
|
||||
!journalsUri.startsWith("qjournal://")) {
|
||||
return journalNodeList;
|
||||
} else {
|
||||
LOG.warn(DFS_NAMENODE_SHARED_EDITS_DIR_KEY +" is to be " +
|
||||
"configured as nameservice" +
|
||||
" specific key(append it with nameserviceId), no need" +
|
||||
" to append it with namenodeId");
|
||||
URI uri = new URI(journalsUri);
|
||||
List<InetSocketAddress> socketAddresses = Util.
|
||||
getAddressesList(uri);
|
||||
for (InetSocketAddress is : socketAddresses) {
|
||||
journalNodeList.add(is.getHostName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (!journalsUri.startsWith("qjournal://")) {
|
||||
return journalNodeList;
|
||||
} else {
|
||||
URI uri = new URI(journalsUri);
|
||||
List<InetSocketAddress> socketAddresses = Util.
|
||||
getAddressesList(uri);
|
||||
for (InetSocketAddress is : socketAddresses) {
|
||||
journalNodeList.add(is.getHostName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!journalsUri.startsWith("qjournal://")) {
|
||||
return journalNodeList;
|
||||
} else {
|
||||
URI uri = new URI(journalsUri);
|
||||
List<InetSocketAddress> socketAddresses = Util.getAddressesList(uri);
|
||||
for (InetSocketAddress is : socketAddresses) {
|
||||
journalNodeList.add(is.getHostName());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch(UnknownHostException e) {
|
||||
LOG.error("The conf property " + DFS_NAMENODE_SHARED_EDITS_DIR_KEY
|
||||
+ " is not properly set with correct journal node hostnames");
|
||||
throw new UnknownHostException(journalsUri);
|
||||
} catch(URISyntaxException e) {
|
||||
LOG.error("The conf property " + DFS_NAMENODE_SHARED_EDITS_DIR_KEY
|
||||
+ "is not set properly with correct journal node uri");
|
||||
throw new URISyntaxException(journalsUri, "The conf property " +
|
||||
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + "is not" +
|
||||
" properly set with correct journal node uri");
|
||||
}
|
||||
|
||||
return journalNodeList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to backup node rpc
|
||||
* addresses from the configuration.
|
||||
|
@ -20,11 +20,13 @@
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -68,6 +70,7 @@ enum Command {
|
||||
SECONDARY("-secondaryNameNodes",
|
||||
"gets list of secondary namenodes in the cluster."),
|
||||
BACKUP("-backupNodes", "gets list of backup nodes in the cluster."),
|
||||
JOURNALNODE("-journalNodes", "gets list of journal nodes in the cluster."),
|
||||
INCLUDE_FILE("-includeFile",
|
||||
"gets the include file path that defines the datanodes " +
|
||||
"that can join the cluster."),
|
||||
@ -86,6 +89,8 @@ enum Command {
|
||||
new SecondaryNameNodesCommandHandler());
|
||||
map.put(StringUtils.toLowerCase(BACKUP.getName()),
|
||||
new BackupNodesCommandHandler());
|
||||
map.put(StringUtils.toLowerCase(JOURNALNODE.getName()),
|
||||
new JournalNodeCommandHandler());
|
||||
map.put(StringUtils.toLowerCase(INCLUDE_FILE.getName()),
|
||||
new CommandHandler(DFSConfigKeys.DFS_HOSTS));
|
||||
map.put(StringUtils.toLowerCase(EXCLUDE_FILE.getName()),
|
||||
@ -202,7 +207,19 @@ public int doWorkInternal(GetConf tool, String []args) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Handler for {@linke Command#JOURNALNODE}.
|
||||
*/
|
||||
static class JournalNodeCommandHandler extends CommandHandler {
|
||||
@Override
|
||||
public int doWorkInternal(GetConf tool, String[] args)
|
||||
throws URISyntaxException, IOException {
|
||||
tool.printSet(DFSUtil.getJournalNodeAddresses(tool.getConf()));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler for {@link Command#SECONDARY}
|
||||
*/
|
||||
@ -288,6 +305,18 @@ void printMap(Map<String, Map<String, InetSocketAddress>> map) {
|
||||
printOut(buffer.toString());
|
||||
}
|
||||
|
||||
void printSet(Set<String> journalnodes) {
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
|
||||
for (String journalnode : journalnodes) {
|
||||
if (buffer.length() > 0) {
|
||||
buffer.append(" ");
|
||||
}
|
||||
buffer.append(journalnode);
|
||||
}
|
||||
printOut(buffer.toString());
|
||||
}
|
||||
|
||||
private void printUsage() {
|
||||
printError(USAGE);
|
||||
}
|
||||
|
@ -23,7 +23,10 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
@ -33,10 +36,14 @@
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
@ -58,7 +65,7 @@
|
||||
*/
|
||||
public class TestGetConf {
|
||||
enum TestType {
|
||||
NAMENODE, BACKUP, SECONDARY, NNRPCADDRESSES
|
||||
NAMENODE, BACKUP, SECONDARY, NNRPCADDRESSES, JOURNALNODE
|
||||
}
|
||||
FileSystem localFileSys;
|
||||
/** Setup federation nameServiceIds in the configuration */
|
||||
@ -96,9 +103,10 @@ private String[] setupAddress(HdfsConfiguration conf, String key,
|
||||
* Add namenodes to the static resolution list to avoid going
|
||||
* through DNS which can be really slow in some configurations.
|
||||
*/
|
||||
private void setupStaticHostResolution(int nameServiceIdCount) {
|
||||
private void setupStaticHostResolution(int nameServiceIdCount,
|
||||
String hostname) {
|
||||
for (int i = 0; i < nameServiceIdCount; i++) {
|
||||
NetUtils.addStaticResolution("nn" + i, "localhost");
|
||||
NetUtils.addStaticResolution(hostname + i, "localhost");
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,6 +181,8 @@ private String getAddressListFromTool(TestType type, HdfsConfiguration conf,
|
||||
case NNRPCADDRESSES:
|
||||
args[0] = Command.NNRPCADDRESSES.getName();
|
||||
break;
|
||||
case JOURNALNODE:
|
||||
args[0] = Command.JOURNALNODE.getName();
|
||||
}
|
||||
return runTool(conf, args, success);
|
||||
}
|
||||
@ -321,7 +331,7 @@ public void testFederation() throws Exception {
|
||||
String[] nnAddresses = setupAddress(conf,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000);
|
||||
setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500);
|
||||
setupStaticHostResolution(nsCount);
|
||||
setupStaticHostResolution(nsCount, "nn");
|
||||
String[] backupAddresses = setupAddress(conf,
|
||||
DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000);
|
||||
String[] secondaryAddresses = setupAddress(conf,
|
||||
@ -348,7 +358,160 @@ public void testFederation() throws Exception {
|
||||
verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses);
|
||||
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests for journal node addresses.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testGetJournalNodes() throws Exception {
|
||||
|
||||
final int nsCount = 3;
|
||||
final String journalsBaseUri = "qjournal://jn0:9820;jn1:9820;jn2:9820";
|
||||
setupStaticHostResolution(nsCount, "jn");
|
||||
|
||||
// With out Name service Id
|
||||
HdfsConfiguration conf = new HdfsConfiguration(false);
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalsBaseUri+"/");
|
||||
|
||||
Set<String> expected = new HashSet<>();
|
||||
expected.add("jn0");
|
||||
expected.add("jn1");
|
||||
expected.add("jn2");
|
||||
|
||||
String expected1 = "";
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
for (String val : expected) {
|
||||
if (buffer.length() > 0) {
|
||||
buffer.append(" ");
|
||||
}
|
||||
buffer.append(val);
|
||||
}
|
||||
buffer.append("\n");
|
||||
expected1 = buffer.toString();
|
||||
|
||||
Set<String> actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
String actual1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
assertEquals(expected1, actual1);
|
||||
conf.clear();
|
||||
|
||||
//With out Name service Id
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalsBaseUri + "/");
|
||||
|
||||
actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
actual1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
assertEquals(expected1, actual1);
|
||||
conf.clear();
|
||||
|
||||
|
||||
//Federation with HA, but suffixed only with Name service Id
|
||||
setupNameServices(conf, nsCount);
|
||||
conf.set(DFS_HA_NAMENODES_KEY_PREFIX +".ns0",
|
||||
"nn0,nn1");
|
||||
conf.set(DFS_HA_NAMENODES_KEY_PREFIX +".ns1",
|
||||
"nn0, nn1");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY+".ns0",
|
||||
journalsBaseUri + "/ns0");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY+".ns1",
|
||||
journalsBaseUri + "/ns1");
|
||||
|
||||
actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
expected1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
assertEquals(expected1, actual1);
|
||||
|
||||
conf.clear();
|
||||
|
||||
|
||||
// Federation with HA
|
||||
setupNameServices(conf, nsCount);
|
||||
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns0", "nn0,nn1");
|
||||
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn0, nn1");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ".ns0.nn0",
|
||||
journalsBaseUri + "/ns0");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ".ns0.nn1",
|
||||
journalsBaseUri + "/ns0");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ".ns1.nn2",
|
||||
journalsBaseUri + "/ns1");
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ".ns1.nn3",
|
||||
journalsBaseUri + "/ns1");
|
||||
|
||||
actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
actual1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
assertEquals(expected1, actual1);
|
||||
|
||||
conf.clear();
|
||||
|
||||
// Name service setup, but no journal node
|
||||
setupNameServices(conf, nsCount);
|
||||
|
||||
expected = new HashSet<>();
|
||||
actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
actual1 = "\n";
|
||||
expected1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
assertEquals(expected1, actual1);
|
||||
conf.clear();
|
||||
|
||||
//name node edits dir is present, but set
|
||||
//to location of storage shared directory
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
"file:///mnt/filer1/dfs/ha-name-dir-shared");
|
||||
|
||||
expected = new HashSet<>();
|
||||
actual = DFSUtil.getJournalNodeAddresses(conf);
|
||||
assertEquals(expected.toString(), actual.toString());
|
||||
|
||||
expected1 = getAddressListFromTool(TestType.JOURNALNODE,
|
||||
conf, true);
|
||||
actual1 = "\n";
|
||||
assertEquals(expected1, actual1);
|
||||
conf.clear();
|
||||
}
|
||||
|
||||
/*
|
||||
** Test for unknown journal node host exception.
|
||||
*/
|
||||
@Test(expected = UnknownHostException.class, timeout = 10000)
|
||||
public void testUnknownJournalNodeHost()
|
||||
throws URISyntaxException, IOException {
|
||||
String journalsBaseUri = "qjournal://jn1:9820;jn2:9820;jn3:9820";
|
||||
HdfsConfiguration conf = new HdfsConfiguration(false);
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalsBaseUri + "/jndata");
|
||||
DFSUtil.getJournalNodeAddresses(conf);
|
||||
}
|
||||
|
||||
/*
|
||||
** Test for malformed journal node urisyntax exception.
|
||||
*/
|
||||
@Test(expected = URISyntaxException.class, timeout = 10000)
|
||||
public void testJournalNodeUriError()
|
||||
throws URISyntaxException, IOException {
|
||||
final int nsCount = 3;
|
||||
String journalsBaseUri = "qjournal://jn0 :9820;jn1:9820;jn2:9820";
|
||||
setupStaticHostResolution(nsCount, "jn");
|
||||
HdfsConfiguration conf = new HdfsConfiguration(false);
|
||||
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||
journalsBaseUri + "/jndata");
|
||||
DFSUtil.getJournalNodeAddresses(conf);
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testGetSpecificKey() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
@ -422,7 +585,7 @@ public void testIncludeInternalNameServices() throws Exception {
|
||||
setupAddress(conf, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsCount, 1000);
|
||||
setupAddress(conf, DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1500);
|
||||
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "ns1");
|
||||
setupStaticHostResolution(nsCount);
|
||||
setupStaticHostResolution(nsCount, "nn");
|
||||
|
||||
String[] includedNN = new String[] {"nn1:1001"};
|
||||
verifyAddresses(conf, TestType.NAMENODE, false, includedNN);
|
||||
|
Loading…
Reference in New Issue
Block a user