HDFS-9142. Separating Configuration object for namenode(s) in MiniDFSCluster. (Siqi Li via mingma)
This commit is contained in:
parent
c11fc8a1be
commit
de8efc65a4
@ -1998,6 +1998,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9137. DeadLock between DataNode#refreshVolumes and
|
HDFS-9137. DeadLock between DataNode#refreshVolumes and
|
||||||
BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu)
|
BPOfferService#registrationSucceeded. (Uma Maheswara Rao G via yliu)
|
||||||
|
|
||||||
|
HDFS-9142. Separating Configuration object for namenode(s) in
|
||||||
|
MiniDFSCluster. (Siqi Li via mingma)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||||
@ -890,6 +891,44 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
|
|||||||
format, operation, clusterId, nnCounter);
|
format, operation, clusterId, nnCounter);
|
||||||
nnCounter += nameservice.getNNs().size();
|
nnCounter += nameservice.getNNs().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (NameNodeInfo nn : namenodes.values()) {
|
||||||
|
Configuration nnConf = nn.conf;
|
||||||
|
for (NameNodeInfo nnInfo : namenodes.values()) {
|
||||||
|
if (nn.equals(nnInfo)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void copyKeys(Configuration srcConf, Configuration destConf,
|
||||||
|
String nameserviceId, String nnId) {
|
||||||
|
String key = DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
nameserviceId, nnId);
|
||||||
|
destConf.set(key, srcConf.get(key));
|
||||||
|
|
||||||
|
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
||||||
|
nameserviceId, nnId);
|
||||||
|
String val = srcConf.get(key);
|
||||||
|
if (val != null) {
|
||||||
|
destConf.set(key, srcConf.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
||||||
|
nameserviceId, nnId);
|
||||||
|
val = srcConf.get(key);
|
||||||
|
if (val != null) {
|
||||||
|
destConf.set(key, srcConf.get(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
key = DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||||
|
nameserviceId, nnId);
|
||||||
|
val = srcConf.get(key);
|
||||||
|
if (val != null) {
|
||||||
|
destConf.set(key, srcConf.get(key));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -972,16 +1011,13 @@ private void configureNameService(MiniDFSNNTopology.NSConf nameservice, int nsCo
|
|||||||
// create all the namenodes in the namespace
|
// create all the namenodes in the namespace
|
||||||
nnIndex = nnCounter;
|
nnIndex = nnCounter;
|
||||||
for (NNConf nn : nameservice.getNNs()) {
|
for (NNConf nn : nameservice.getNNs()) {
|
||||||
initNameNodeConf(conf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
|
Configuration hdfsConf = new Configuration(conf);
|
||||||
|
initNameNodeConf(hdfsConf, nsId, nsCounter, nn.getNnId(), manageNameDfsDirs,
|
||||||
enableManagedDfsDirsRedundancy, nnIndex++);
|
enableManagedDfsDirsRedundancy, nnIndex++);
|
||||||
NameNodeInfo info = createNameNode(conf, false, operation,
|
createNameNode(hdfsConf, false, operation,
|
||||||
clusterId, nsId, nn.getNnId());
|
clusterId, nsId, nn.getNnId());
|
||||||
|
|
||||||
// Record the last namenode uri
|
// Record the last namenode uri
|
||||||
if (info != null && info.conf != null) {
|
lastDefaultFileSystem = hdfsConf.get(FS_DEFAULT_NAME_KEY);
|
||||||
lastDefaultFileSystem =
|
|
||||||
info.conf.get(FS_DEFAULT_NAME_KEY);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (!federation && lastDefaultFileSystem != null) {
|
if (!federation && lastDefaultFileSystem != null) {
|
||||||
// Set the default file system to the actual bind address of NN.
|
// Set the default file system to the actual bind address of NN.
|
||||||
@ -1198,50 +1234,39 @@ private static String[] createArgs(StartupOption operation) {
|
|||||||
return args;
|
return args;
|
||||||
}
|
}
|
||||||
|
|
||||||
private NameNodeInfo createNameNode(Configuration conf, boolean format, StartupOption operation,
|
private void createNameNode(Configuration hdfsConf, boolean format, StartupOption operation,
|
||||||
String clusterId, String nameserviceId, String nnId) throws IOException {
|
String clusterId, String nameserviceId, String nnId) throws IOException {
|
||||||
// Format and clean out DataNode directories
|
// Format and clean out DataNode directories
|
||||||
if (format) {
|
if (format) {
|
||||||
DFSTestUtil.formatNameNode(conf);
|
DFSTestUtil.formatNameNode(hdfsConf);
|
||||||
}
|
}
|
||||||
if (operation == StartupOption.UPGRADE){
|
if (operation == StartupOption.UPGRADE){
|
||||||
operation.setClusterId(clusterId);
|
operation.setClusterId(clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the NameNode after saving the default file system.
|
|
||||||
String originalDefaultFs = conf.get(FS_DEFAULT_NAME_KEY);
|
|
||||||
String[] args = createArgs(operation);
|
String[] args = createArgs(operation);
|
||||||
NameNode nn = NameNode.createNameNode(args, conf);
|
NameNode nn = NameNode.createNameNode(args, hdfsConf);
|
||||||
if (operation == StartupOption.RECOVER) {
|
if (operation == StartupOption.RECOVER) {
|
||||||
return null;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// After the NN has started, set back the bound ports into
|
// After the NN has started, set back the bound ports into
|
||||||
// the conf
|
// the conf
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
nameserviceId, nnId), nn.getNameNodeAddressHostPortString());
|
nameserviceId, nnId), nn.getNameNodeAddressHostPortString());
|
||||||
if (nn.getHttpAddress() != null) {
|
if (nn.getHttpAddress() != null) {
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
|
||||||
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress()));
|
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress()));
|
||||||
}
|
}
|
||||||
if (nn.getHttpsAddress() != null) {
|
if (nn.getHttpsAddress() != null) {
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTPS_ADDRESS_KEY,
|
||||||
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress()));
|
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpsAddress()));
|
||||||
}
|
}
|
||||||
|
copyKeys(hdfsConf, conf, nameserviceId, nnId);
|
||||||
DFSUtil.setGenericConf(conf, nameserviceId, nnId,
|
DFSUtil.setGenericConf(hdfsConf, nameserviceId, nnId,
|
||||||
DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||||
NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
|
NameNodeInfo info = new NameNodeInfo(nn, nameserviceId, nnId,
|
||||||
operation, new Configuration(conf));
|
operation, hdfsConf);
|
||||||
namenodes.put(nameserviceId, info);
|
namenodes.put(nameserviceId, info);
|
||||||
|
|
||||||
// Restore the default fs name
|
|
||||||
if (originalDefaultFs == null) {
|
|
||||||
conf.set(FS_DEFAULT_NAME_KEY, "");
|
|
||||||
} else {
|
|
||||||
conf.set(FS_DEFAULT_NAME_KEY, originalDefaultFs);
|
|
||||||
}
|
|
||||||
return info;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -2856,7 +2881,7 @@ private void checkSingleNameNode() {
|
|||||||
*
|
*
|
||||||
* @return newly started namenode
|
* @return newly started namenode
|
||||||
*/
|
*/
|
||||||
public NameNode addNameNode(Configuration conf, int namenodePort)
|
public void addNameNode(Configuration conf, int namenodePort)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if(!federation)
|
if(!federation)
|
||||||
throw new IOException("cannot add namenode to non-federated cluster");
|
throw new IOException("cannot add namenode to non-federated cluster");
|
||||||
@ -2875,7 +2900,7 @@ public NameNode addNameNode(Configuration conf, int namenodePort)
|
|||||||
NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
|
NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
|
||||||
int nnIndex = infos == null ? 0 : infos.length;
|
int nnIndex = infos == null ? 0 : infos.length;
|
||||||
initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
|
initNameNodeConf(conf, nameserviceId, nameServiceIndex, nnId, true, true, nnIndex);
|
||||||
NameNodeInfo info = createNameNode(conf, true, null, null, nameserviceId, nnId);
|
createNameNode(conf, true, null, null, nameserviceId, nnId);
|
||||||
|
|
||||||
// Refresh datanodes with the newly started namenode
|
// Refresh datanodes with the newly started namenode
|
||||||
for (DataNodeProperties dn : dataNodes) {
|
for (DataNodeProperties dn : dataNodes) {
|
||||||
@ -2885,7 +2910,6 @@ public NameNode addNameNode(Configuration conf, int namenodePort)
|
|||||||
|
|
||||||
// Wait for new namenode to get registrations from all the datanodes
|
// Wait for new namenode to get registrations from all the datanodes
|
||||||
waitActive(nnIndex);
|
waitActive(nnIndex);
|
||||||
return info.nameNode;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
|
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assume.assumeTrue;
|
import static org.junit.Assume.assumeTrue;
|
||||||
@ -28,6 +29,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
@ -182,4 +184,49 @@ public void testClusterNoStorageTypeSetForDatanodes() throws IOException {
|
|||||||
MiniDFSCluster.shutdownCluster(cluster);
|
MiniDFSCluster.shutdownCluster(cluster);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetUpFederatedCluster() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).nnTopology(
|
||||||
|
MiniDFSNNTopology.simpleHAFederatedTopology(2))
|
||||||
|
.numDataNodes(2)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
cluster.transitionToActive(3);
|
||||||
|
assertEquals("standby", cluster.getNamesystem(0).getHAState());
|
||||||
|
assertEquals("active", cluster.getNamesystem(1).getHAState());
|
||||||
|
assertEquals("standby", cluster.getNamesystem(2).getHAState());
|
||||||
|
assertEquals("active", cluster.getNamesystem(3).getHAState());
|
||||||
|
|
||||||
|
String ns0nn0 = conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0"));
|
||||||
|
String ns0nn1 = conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1"));
|
||||||
|
String ns1nn0 = conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0"));
|
||||||
|
String ns1nn1 = conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1"));
|
||||||
|
|
||||||
|
for(NameNodeInfo nnInfo : cluster.getNameNodeInfos()) {
|
||||||
|
assertEquals(ns0nn0, nnInfo.conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn0")));
|
||||||
|
assertEquals(ns0nn1, nnInfo.conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns0", "nn1")));
|
||||||
|
assertEquals(ns1nn0, nnInfo.conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn0")));
|
||||||
|
assertEquals(ns1nn1, nnInfo.conf.get(
|
||||||
|
DFSUtil.addKeySuffixes(
|
||||||
|
DFS_NAMENODE_HTTP_ADDRESS_KEY, "ns1", "nn1")));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
MiniDFSCluster.shutdownCluster(cluster);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user