Revert "HDFS-10391. Always enable NameNode service RPC port. Contributed by Gergely Novak."

This reverts commit aa4b6fbe75.
This commit is contained in:
Arpit Agarwal 2017-09-14 11:17:08 -07:00
parent 66ca0a6540
commit 65a941008d
38 changed files with 293 additions and 471 deletions

View File

@ -74,7 +74,6 @@ public interface HdfsClientConfigKeys {
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address"; String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes"; String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
int DFS_NAMENODE_RPC_PORT_DEFAULT = 9820; int DFS_NAMENODE_RPC_PORT_DEFAULT = 9820;
int DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT = 9840;
String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY = String DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY =
"dfs.namenode.kerberos.principal"; "dfs.namenode.kerberos.principal";
String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";

View File

@ -35,7 +35,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -492,26 +491,62 @@ public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAd
return addressList; return addressList;
} }
/**
* Returns list of InetSocketAddresses corresponding to namenodes from the
* configuration.
*
* Returns namenode address specifically configured for datanodes (using
* service ports), if found. If not, regular RPC address configured for other
* clients is returned.
*
* @param conf configuration
* @return list of InetSocketAddress
* @throws IOException on error
*/
public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddresses(
Configuration conf) throws IOException {
// Use default address as fall back
String defaultAddress;
try {
defaultAddress = NetUtils.getHostPortString(
DFSUtilClient.getNNAddress(conf));
} catch (IllegalArgumentException e) {
defaultAddress = null;
}
Map<String, Map<String, InetSocketAddress>> addressList =
DFSUtilClient.getAddresses(conf, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY);
if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: namenode address "
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+ DFS_NAMENODE_RPC_ADDRESS_KEY
+ " is not configured.");
}
return addressList;
}
/** /**
* Returns list of InetSocketAddresses corresponding to the namenode * Returns list of InetSocketAddresses corresponding to the namenode
* that manages this cluster. Note this is to be used by datanodes to get * that manages this cluster. Note this is to be used by datanodes to get
* the list of namenode addresses to talk to. * the list of namenode addresses to talk to.
* *
* Returns namenode address specifically configured for datanodes * Returns namenode address specifically configured for datanodes (using
* service ports), if found. If not, regular RPC address configured for other
* clients is returned.
* *
* @param conf configuration * @param conf configuration
* @return list of InetSocketAddress * @return list of InetSocketAddress
* @throws IOException on error * @throws IOException on error
*/ */
public static Map<String, Map<String, InetSocketAddress>> public static Map<String, Map<String, InetSocketAddress>>
getNNServiceRpcAddresses(Configuration conf) throws IOException { getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {
// Use default address as fall back // Use default address as fall back
String defaultAddress; String defaultAddress;
try { try {
InetSocketAddress rpcAddress = DFSUtilClient.getNNAddress(conf); defaultAddress = NetUtils.getHostPortString(
InetSocketAddress serviceAddress = InetSocketAddress.createUnresolved( DFSUtilClient.getNNAddress(conf));
rpcAddress.getHostName(), DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
defaultAddress = NetUtils.getHostPortString(serviceAddress);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
defaultAddress = null; defaultAddress = null;
} }
@ -534,47 +569,17 @@ public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAd
} }
} }
// If true, then replace the port numbers in the final address list
// with the default service RPC port.
boolean replacePortNumbers = false;
// First try to lookup using the service RPC address keys.
Map<String, Map<String, InetSocketAddress>> addressList = Map<String, Map<String, InetSocketAddress>> addressList =
DFSUtilClient.getAddressesForNsIds( DFSUtilClient.getAddressesForNsIds(conf, parentNameServices,
conf, parentNameServices, null, defaultAddress,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY); DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_RPC_ADDRESS_KEY);
// Next try to lookup using the RPC address key.
if (addressList.isEmpty()) {
replacePortNumbers = true;
addressList = DFSUtilClient.getAddressesForNsIds(
conf, parentNameServices, null, DFS_NAMENODE_RPC_ADDRESS_KEY);
}
// Finally, fallback to the default address.
// This will not yield the correct address in a federated/HA setup.
if (addressList.isEmpty()) {
addressList = DFSUtilClient.getAddressesForNsIds(
conf, parentNameServices, defaultAddress);
}
if (addressList.isEmpty()) { if (addressList.isEmpty()) {
throw new IOException("Incorrect configuration: namenode address " throw new IOException("Incorrect configuration: namenode address "
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
+ DFS_NAMENODE_RPC_ADDRESS_KEY + DFS_NAMENODE_RPC_ADDRESS_KEY
+ " is not configured."); + " is not configured.");
} }
if (replacePortNumbers) {
// Replace the RPC port(s) with the default service RPC port(s)
addressList.forEach((nsId, addresses) -> {
addresses.forEach((nnId, address) -> {
InetSocketAddress serviceAddress = InetSocketAddress.createUnresolved(
address.getHostName(), DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
addresses.put(nnId, serviceAddress);
});
});
}
return addressList; return addressList;
} }
@ -1225,17 +1230,12 @@ public static String getNamenodeServiceAddr(final Configuration conf,
String serviceAddrKey = DFSUtilClient.concatSuffixes( String serviceAddrKey = DFSUtilClient.concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId); DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
String serviceRpcAddr = conf.get(serviceAddrKey);
if (serviceRpcAddr == null) {
String addrKey = DFSUtilClient.concatSuffixes( String addrKey = DFSUtilClient.concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId); DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
String rpcAddress = conf.get(addrKey);
if (rpcAddress != null) { String serviceRpcAddr = conf.get(serviceAddrKey);
InetSocketAddress rpcAddr = NetUtils.createSocketAddr(rpcAddress); if (serviceRpcAddr == null) {
InetSocketAddress serviceAddr = InetSocketAddress.createUnresolved( serviceRpcAddr = conf.get(addrKey);
rpcAddr.getHostName(), DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
serviceRpcAddr = NetUtils.getHostPortString(serviceAddr);
}
} }
return serviceRpcAddr; return serviceRpcAddr;
} }

View File

@ -150,7 +150,7 @@ void refreshNamenodes(Configuration conf)
(DFSConfigKeys.DFS_NAMESERVICES)); (DFSConfigKeys.DFS_NAMESERVICES));
Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
.getNNServiceRpcAddresses(conf); .getNNServiceRpcAddressesForCluster(conf);
Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil Map<String, Map<String, InetSocketAddress>> newLifelineAddressMap = DFSUtil
.getNNLifelineRpcAddressesForCluster(conf); .getNNLifelineRpcAddressesForCluster(conf);

View File

@ -318,7 +318,7 @@ boolean shouldCheckpointAtStartup() {
private NamespaceInfo handshake(Configuration conf) throws IOException { private NamespaceInfo handshake(Configuration conf) throws IOException {
// connect to name node // connect to name node
InetSocketAddress nnAddress = NameNode.getServiceAddress(conf); InetSocketAddress nnAddress = NameNode.getServiceAddress(conf, true);
this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddress, this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddress,
NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), NamenodeProtocol.class, UserGroupInformation.getCurrentUser(),
true).getProxy(); true).getProxy();

View File

@ -1157,8 +1157,9 @@ void startCommonServices(Configuration conf, HAContext haContext) throws IOExcep
dir.setINodeAttributeProvider(inodeAttributeProvider); dir.setINodeAttributeProvider(inodeAttributeProvider);
} }
snapshotManager.registerMXBean(); snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf); InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = serviceAddress.getHostName(); this.nameNodeHostName = (serviceAddress != null) ?
serviceAddress.getHostName() : "";
} }
/** /**

View File

@ -505,17 +505,18 @@ public static void setServiceAddress(Configuration conf,
/** /**
* Fetches the address for services to use when connecting to namenode * Fetches the address for services to use when connecting to namenode
* based on the value of fallback returns null if the special
* address is not specified or returns the default namenode address
* to be used by both clients and services.
* Services here are datanodes, backup node, any non client connection * Services here are datanodes, backup node, any non client connection
*/ */
public static InetSocketAddress getServiceAddress(Configuration conf) { public static InetSocketAddress getServiceAddress(Configuration conf,
String address = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY); boolean fallback) {
if (address == null || address.isEmpty()) { String addr = conf.getTrimmed(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY);
InetSocketAddress rpcAddress = DFSUtilClient.getNNAddress(conf); if (addr == null || addr.isEmpty()) {
return NetUtils.createSocketAddr(rpcAddress.getHostName(), return fallback ? DFSUtilClient.getNNAddress(conf) : null;
HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
} }
return NetUtils.createSocketAddr(address, return DFSUtilClient.getNNAddress(addr);
HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
} }
// //
@ -553,7 +554,7 @@ InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
* If the service rpc is not configured returns null * If the service rpc is not configured returns null
*/ */
protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) { protected InetSocketAddress getServiceRpcServerAddress(Configuration conf) {
return NameNode.getServiceAddress(conf); return NameNode.getServiceAddress(conf, false);
} }
protected InetSocketAddress getRpcServerAddress(Configuration conf) { protected InetSocketAddress getRpcServerAddress(Configuration conf) {
@ -614,8 +615,7 @@ void setRpcLifelineServerAddress(Configuration conf,
} }
/** /**
* Modifies the configuration passed to contain the service rpc address * Modifies the configuration passed to contain the service rpc address setting
* setting.
*/ */
protected void setRpcServiceServerAddress(Configuration conf, protected void setRpcServiceServerAddress(Configuration conf,
InetSocketAddress serviceRPCAddress) { InetSocketAddress serviceRPCAddress) {
@ -1070,13 +1070,6 @@ public InetSocketAddress getServiceRpcAddress() {
return serviceAddr == null ? getNameNodeAddress() : serviceAddr; return serviceAddr == null ? getNameNodeAddress() : serviceAddr;
} }
/**
* @return NameNode service RPC address in "host:port" string form
*/
public String getServiceRpcAddressHostPortString() {
return NetUtils.getHostPortString(getServiceRpcAddress());
}
/** /**
* @return NameNode HTTP address, used by the Web UI, image transfer, * @return NameNode HTTP address, used by the Web UI, image transfer,
* and HTTP-based file system clients like WebHDFS * and HTTP-based file system clients like WebHDFS

View File

@ -333,16 +333,16 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
.newReflectiveBlockingService(traceAdminXlator); .newReflectiveBlockingService(traceAdminXlator);
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
String bindHost = nn.getServiceRpcServerBindHost(conf); String bindHost = nn.getServiceRpcServerBindHost(conf);
if (bindHost == null) { if (bindHost == null) {
bindHost = serviceRpcAddr.getHostName(); bindHost = serviceRpcAddr.getHostName();
} }
LOG.info("Service RPC server is binding to " + bindHost + ":" + LOG.info("Service RPC server is binding to " + bindHost + ":" +
serviceRpcAddr.getPort()); serviceRpcAddr.getPort());
int serviceHandlerCount = conf.getInt( int serviceHandlerCount =
DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
serviceRpcServer = new RPC.Builder(conf) serviceRpcServer = new RPC.Builder(conf)
.setProtocol( .setProtocol(
@ -368,8 +368,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
refreshAuthService, serviceRpcServer); refreshAuthService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, serviceRpcServer); refreshUserMappingService, serviceRpcServer);
// We support Refreshing call queue here in case the client RPC queue // We support Refreshing call queue here in case the client RPC queue is full
// is full.
DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer); refreshCallQueueService, serviceRpcServer);
DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
@ -379,17 +378,21 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class,
traceAdminService, serviceRpcServer); traceAdminService, serviceRpcServer);
// Update the address with the correct port. // Update the address with the correct port
InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
serviceRPCAddress = new InetSocketAddress( serviceRPCAddress = new InetSocketAddress(
serviceRpcAddr.getHostName(), listenAddr.getPort()); serviceRpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServiceServerAddress(conf, serviceRPCAddress); nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
} else {
serviceRpcServer = null;
serviceRPCAddress = null;
}
InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf); InetSocketAddress lifelineRpcAddr = nn.getLifelineRpcServerAddress(conf);
if (lifelineRpcAddr != null) { if (lifelineRpcAddr != null) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
bindHost = nn.getLifelineRpcServerBindHost(conf); String bindHost = nn.getLifelineRpcServerBindHost(conf);
if (bindHost == null) { if (bindHost == null) {
bindHost = lifelineRpcAddr.getHostName(); bindHost = lifelineRpcAddr.getHostName();
} }
@ -419,7 +422,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
lifelineProtoPbService, lifelineRpcServer); lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port // Update the address with the correct port
listenAddr = lifelineRpcServer.getListenerAddress(); InetSocketAddress listenAddr = lifelineRpcServer.getListenerAddress();
lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(), lifelineRPCAddress = new InetSocketAddress(lifelineRpcAddr.getHostName(),
listenAddr.getPort()); listenAddr.getPort());
nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress); nn.setRpcLifelineServerAddress(conf, lifelineRPCAddress);
@ -429,7 +432,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
} }
InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
bindHost = nn.getRpcServerBindHost(conf); String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) { if (bindHost == null) {
bindHost = rpcAddr.getHostName(); bindHost = rpcAddr.getHostName();
} }
@ -473,14 +476,16 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
conf.getBoolean( conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); lifelineRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
} }
} }
// The rpc-server port can be ephemeral... ensure we have the correct info // The rpc-server port can be ephemeral... ensure we have the correct info
listenAddr = clientRpcServer.getListenerAddress(); InetSocketAddress listenAddr = clientRpcServer.getListenerAddress();
clientRpcAddress = new InetSocketAddress( clientRpcAddress = new InetSocketAddress(
rpcAddr.getHostName(), listenAddr.getPort()); rpcAddr.getHostName(), listenAddr.getPort());
nn.setRpcServerAddress(conf, clientRpcAddress); nn.setRpcServerAddress(conf, clientRpcAddress);
@ -518,7 +523,9 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class); clientRpcServer.addSuppressedLoggingExceptions(StandbyException.class);
clientRpcServer.setTracer(nn.tracer); clientRpcServer.setTracer(nn.tracer);
if (serviceRpcServer != null) {
serviceRpcServer.setTracer(nn.tracer); serviceRpcServer.setTracer(nn.tracer);
}
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer); lifelineRpcServer.setTracer(nn.tracer);
} }
@ -547,7 +554,9 @@ RPC.Server getServiceRpcServer() {
*/ */
void start() { void start() {
clientRpcServer.start(); clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start(); serviceRpcServer.start();
}
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.start(); lifelineRpcServer.start();
} }
@ -558,7 +567,9 @@ void start() {
*/ */
void join() throws InterruptedException { void join() throws InterruptedException {
clientRpcServer.join(); clientRpcServer.join();
if (serviceRpcServer != null) {
serviceRpcServer.join(); serviceRpcServer.join();
}
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.join(); lifelineRpcServer.join();
} }
@ -571,7 +582,9 @@ void stop() {
if (clientRpcServer != null) { if (clientRpcServer != null) {
clientRpcServer.stop(); clientRpcServer.stop();
} }
if (serviceRpcServer != null) {
serviceRpcServer.stop(); serviceRpcServer.stop();
}
if (lifelineRpcServer != null) { if (lifelineRpcServer != null) {
lifelineRpcServer.stop(); lifelineRpcServer.stop();
} }

View File

@ -228,7 +228,7 @@ private void initialize(final Configuration conf,
// Create connection to the namenode. // Create connection to the namenode.
shouldRun = true; shouldRun = true;
nameNodeAddr = NameNode.getServiceAddress(conf); nameNodeAddr = NameNode.getServiceAddress(conf, true);
this.conf = conf; this.conf = conf;
this.namenode = NameNodeProxies.createNonHAProxy(conf, nameNodeAddr, this.namenode = NameNodeProxies.createNonHAProxy(conf, nameNodeAddr,

View File

@ -159,8 +159,7 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
for (RemoteNameNodeInfo info : nns) { for (RemoteNameNodeInfo info : nns) {
// overwrite the socket address, if we need to // overwrite the socket address, if we need to
InetSocketAddress ipc = NameNode.getServiceAddress( InetSocketAddress ipc = NameNode.getServiceAddress(info.getConfiguration(), true);
info.getConfiguration());
// sanity check the ipc address // sanity check the ipc address
Preconditions.checkArgument(ipc.getPort() > 0, Preconditions.checkArgument(ipc.getPort() > 0,
"Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc); "Active NameNode must have an IPC port configured. " + "Got address '%s'", ipc);

View File

@ -54,7 +54,7 @@ public static List<RemoteNameNodeInfo> getRemoteNameNodes(Configuration conf, St
for (Configuration otherNode : otherNodes) { for (Configuration otherNode : otherNodes) {
String otherNNId = HAUtil.getNameNodeId(otherNode, nsId); String otherNNId = HAUtil.getNameNodeId(otherNode, nsId);
// don't do any validation here as in some cases, it can be overwritten later // don't do any validation here as in some cases, it can be overwritten later
InetSocketAddress otherIpcAddr = NameNode.getServiceAddress(otherNode); InetSocketAddress otherIpcAddr = NameNode.getServiceAddress(otherNode, true);
final String scheme = DFSUtil.getHttpClientScheme(conf); final String scheme = DFSUtil.getHttpClientScheme(conf);

View File

@ -121,7 +121,7 @@ private void setNameNodeAddresses(Configuration conf) throws IOException {
private URL getHttpAddress(Configuration conf) throws IOException { private URL getHttpAddress(Configuration conf) throws IOException {
final String scheme = DFSUtil.getHttpClientScheme(conf); final String scheme = DFSUtil.getHttpClientScheme(conf);
String defaultHost = NameNode.getServiceAddress(conf).getHostName(); String defaultHost = NameNode.getServiceAddress(conf, true).getHostName();
URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme); URI addr = DFSUtil.getInfoServerWithDefaultHost(defaultHost, conf, scheme);
return addr.toURL(); return addr.toURL();
} }

View File

@ -187,7 +187,7 @@ int doWorkInternal(GetConf tool, String[] args) throws Exception {
static class NameNodesCommandHandler extends CommandHandler { static class NameNodesCommandHandler extends CommandHandler {
@Override @Override
int doWorkInternal(GetConf tool, String []args) throws IOException { int doWorkInternal(GetConf tool, String []args) throws IOException {
tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); tool.printMap(DFSUtil.getNNServiceRpcAddressesForCluster(tool.getConf()));
return 0; return 0;
} }
} }
@ -224,7 +224,7 @@ static class NNRpcAddressesCommandHandler extends CommandHandler {
public int doWorkInternal(GetConf tool, String []args) throws IOException { public int doWorkInternal(GetConf tool, String []args) throws IOException {
Configuration config = tool.getConf(); Configuration config = tool.getConf();
List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap( List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(config)); DFSUtil.getNNServiceRpcAddressesForCluster(config));
if (!cnnlist.isEmpty()) { if (!cnnlist.isEmpty()) {
for (ConfiguredNNAddress cnn : cnnlist) { for (ConfiguredNNAddress cnn : cnnlist) {
InetSocketAddress rpc = cnn.getAddress(); InetSocketAddress rpc = cnn.getAddress();

View File

@ -61,7 +61,8 @@
connecting to this address if it is configured. In the case of HA/Federation where multiple namenodes exist, connecting to this address if it is configured. In the case of HA/Federation where multiple namenodes exist,
the name service id is added to the name e.g. dfs.namenode.servicerpc-address.ns1 the name service id is added to the name e.g. dfs.namenode.servicerpc-address.ns1
dfs.namenode.rpc-address.EXAMPLENAMESERVICE dfs.namenode.rpc-address.EXAMPLENAMESERVICE
The value of this property will take the form of nn-host1:rpc-port. The NameNode's default service RPC port is 9840. The value of this property will take the form of nn-host1:rpc-port.
If the value of this property is unset the value of dfs.namenode.rpc-address will be used as the default.
</description> </description>
</property> </property>

View File

@ -166,7 +166,6 @@ public int getStoragesPerDatanode() {
*/ */
public static class Builder { public static class Builder {
private int nameNodePort = 0; private int nameNodePort = 0;
private int nameNodeServicePort = 0;
private int nameNodeHttpPort = 0; private int nameNodeHttpPort = 0;
private final Configuration conf; private final Configuration conf;
private int numDataNodes = 1; private int numDataNodes = 1;
@ -210,14 +209,6 @@ public Builder nameNodePort(int val) {
return this; return this;
} }
/**
* Default: 0
*/
public Builder nameNodeServicePort(int val) {
this.nameNodeServicePort = val;
return this;
}
/** /**
* Default: 0 * Default: 0
*/ */
@ -408,8 +399,8 @@ public Builder clusterId(String cid) {
} }
/** /**
* Default: false. * Default: false
* When true the hosts file/include file for the cluster is setup. * When true the hosts file/include file for the cluster is setup
*/ */
public Builder setupHostsFile(boolean val) { public Builder setupHostsFile(boolean val) {
this.setupHostsFile = val; this.setupHostsFile = val;
@ -419,7 +410,7 @@ public Builder setupHostsFile(boolean val) {
/** /**
* Default: a single namenode. * Default: a single namenode.
* See {@link MiniDFSNNTopology#simpleFederatedTopology(int)} to set up * See {@link MiniDFSNNTopology#simpleFederatedTopology(int)} to set up
* federated nameservices. * federated nameservices
*/ */
public Builder nnTopology(MiniDFSNNTopology topology) { public Builder nnTopology(MiniDFSNNTopology topology) {
this.nnTopology = topology; this.nnTopology = topology;
@ -470,8 +461,7 @@ protected MiniDFSCluster(Builder builder) throws IOException {
if (builder.nnTopology == null) { if (builder.nnTopology == null) {
// If no topology is specified, build a single NN. // If no topology is specified, build a single NN.
builder.nnTopology = MiniDFSNNTopology.simpleSingleNN( builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
builder.nameNodePort, builder.nameNodeServicePort, builder.nameNodePort, builder.nameNodeHttpPort);
builder.nameNodeHttpPort);
} }
assert builder.storageTypes == null || assert builder.storageTypes == null ||
builder.storageTypes.length == builder.numDataNodes; builder.storageTypes.length == builder.numDataNodes;
@ -780,7 +770,7 @@ public MiniDFSCluster(int nameNodePort,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, null, racks, hosts, operation, null, racks, hosts,
null, simulatedCapacities, null, true, false, null, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0, 0), MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0),
true, false, false, null, true, false); true, false, false, null, true, false);
} }
@ -1259,11 +1249,6 @@ private static void initNameNodeAddress(Configuration conf,
DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId, DFS_NAMENODE_RPC_ADDRESS_KEY, nameserviceId,
nnConf.getNnId()); nnConf.getNnId());
conf.set(key, "127.0.0.1:" + nnConf.getIpcPort()); conf.set(key, "127.0.0.1:" + nnConf.getIpcPort());
key = DFSUtil.addKeySuffixes(
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nameserviceId,
nnConf.getNnId());
conf.set(key, "127.0.0.1:" + nnConf.getServicePort());
} }
private static String[] createArgs(StartupOption operation) { private static String[] createArgs(StartupOption operation) {
@ -1297,8 +1282,6 @@ private void createNameNode(Configuration hdfsConf, boolean format, StartupOptio
// the conf // the conf
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
nameserviceId, nnId), nn.getNameNodeAddressHostPortString()); nameserviceId, nnId), nn.getNameNodeAddressHostPortString());
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
nameserviceId, nnId), nn.getServiceRpcAddressHostPortString());
if (nn.getHttpAddress() != null) { if (nn.getHttpAddress() != null) {
hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY, hdfsConf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_HTTP_ADDRESS_KEY,
nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress())); nameserviceId, nnId), NetUtils.getHostPortString(nn.getHttpAddress()));
@ -1354,14 +1337,6 @@ public Configuration getConfiguration(int nnIndex) {
return getNN(nnIndex).conf; return getNN(nnIndex).conf;
} }
/**
* Return the cluster-wide configuration.
* @return
*/
public Configuration getClusterConfiguration() {
return conf;
}
private NameNodeInfo getNN(int nnIndex) { private NameNodeInfo getNN(int nnIndex) {
int count = 0; int count = 0;
for (NameNodeInfo nn : namenodes.values()) { for (NameNodeInfo nn : namenodes.values()) {
@ -1954,16 +1929,6 @@ public int getNameNodePort(int nnIndex) {
return getNN(nnIndex).nameNode.getNameNodeAddress().getPort(); return getNN(nnIndex).nameNode.getNameNodeAddress().getPort();
} }
/**
* Gets the service rpc port used by the NameNode, because the caller
* supplied port is not necessarily the actual port used.
* Assumption: cluster has a single namenode
*/
public int getNameNodeServicePort() {
checkSingleNameNode();
return getNameNodeServicePort(0);
}
/** /**
* @return the service rpc port used by the NameNode at the given index. * @return the service rpc port used by the NameNode at the given index.
*/ */
@ -2591,14 +2556,12 @@ public void waitActive(int nnIndex) throws IOException {
} }
NameNodeInfo info = getNN(nnIndex); NameNodeInfo info = getNN(nnIndex);
InetSocketAddress nameNodeAddress = info.nameNode.getNameNodeAddress(); InetSocketAddress addr = info.nameNode.getServiceRpcAddress();
assert nameNodeAddress.getPort() != 0; assert addr.getPort() != 0;
DFSClient client = new DFSClient(nameNodeAddress, conf); DFSClient client = new DFSClient(addr, conf);
// ensure all datanodes have registered and sent heartbeat to the namenode // ensure all datanodes have registered and sent heartbeat to the namenode
InetSocketAddress serviceAddress = info.nameNode.getServiceRpcAddress(); while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE), addr)) {
while (shouldWait(client.datanodeReport(DatanodeReportType.LIVE),
serviceAddress)) {
try { try {
LOG.info("Waiting for cluster to become active"); LOG.info("Waiting for cluster to become active");
Thread.sleep(100); Thread.sleep(100);
@ -3093,18 +3056,13 @@ private void checkSingleNameNode() {
} }
} }
public void addNameNode(Configuration conf, int namenodePort)
throws IOException{
addNameNode(conf, namenodePort, 0);
}
/** /**
* Add a namenode to a federated cluster and start it. Configuration of * Add a namenode to a federated cluster and start it. Configuration of
* datanodes in the cluster is refreshed to register with the new namenode. * datanodes in the cluster is refreshed to register with the new namenode.
* *
* @return newly started namenode * @return newly started namenode
*/ */
public void addNameNode(Configuration conf, int namenodePort, int servicePort) public void addNameNode(Configuration conf, int namenodePort)
throws IOException { throws IOException {
if(!federation) if(!federation)
throw new IOException("cannot add namenode to non-federated cluster"); throw new IOException("cannot add namenode to non-federated cluster");
@ -3118,9 +3076,7 @@ public void addNameNode(Configuration conf, int namenodePort, int servicePort)
String nnId = null; String nnId = null;
initNameNodeAddress(conf, nameserviceId, initNameNodeAddress(conf, nameserviceId,
new NNConf(nnId) new NNConf(nnId).setIpcPort(namenodePort));
.setIpcPort(namenodePort)
.setServicePort(servicePort));
// figure out the current number of NNs // figure out the current number of NNs
NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId); NameNodeInfo[] infos = this.getNameNodeInfos(nameserviceId);
int nnIndex = infos == null ? 0 : infos.length; int nnIndex = infos == null ? 0 : infos.length;

View File

@ -43,13 +43,12 @@ public MiniDFSNNTopology() {
* Set up a simple non-federated non-HA NN. * Set up a simple non-federated non-HA NN.
*/ */
public static MiniDFSNNTopology simpleSingleNN( public static MiniDFSNNTopology simpleSingleNN(
int rpcPort, int servicePort, int httpPort) { int nameNodePort, int nameNodeHttpPort) {
return new MiniDFSNNTopology() return new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf(null) .addNameservice(new MiniDFSNNTopology.NSConf(null)
.addNN(new MiniDFSNNTopology.NNConf(null) .addNN(new MiniDFSNNTopology.NNConf(null)
.setIpcPort(rpcPort) .setHttpPort(nameNodeHttpPort)
.setServicePort(servicePort) .setIpcPort(nameNodePort)));
.setHttpPort(httpPort)));
} }
@ -222,7 +221,6 @@ public static class NNConf {
private final String nnId; private final String nnId;
private int httpPort; private int httpPort;
private int ipcPort; private int ipcPort;
private int servicePort;
private String clusterId; private String clusterId;
public NNConf(String nnId) { public NNConf(String nnId) {
@ -237,10 +235,6 @@ int getIpcPort() {
return ipcPort; return ipcPort;
} }
int getServicePort() {
return servicePort;
}
int getHttpPort() { int getHttpPort() {
return httpPort; return httpPort;
} }
@ -259,11 +253,6 @@ public NNConf setIpcPort(int ipcPort) {
return this; return this;
} }
public NNConf setServicePort(int servicePort) {
this.servicePort = servicePort;
return this;
}
public NNConf setClusterId(String clusterId) { public NNConf setClusterId(String clusterId) {
this.clusterId = clusterId; this.clusterId = clusterId;
return this; return this;

View File

@ -33,7 +33,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
@ -84,9 +83,9 @@
public class TestDFSUtil { public class TestDFSUtil {
private static final String NS1_NN_ADDR = "ns1-nn.example.com:9820"; static final String NS1_NN_ADDR = "ns1-nn.example.com:9820";
private static final String NS1_NN1_ADDR = "ns1-nn1.example.com:9820"; static final String NS1_NN1_ADDR = "ns1-nn1.example.com:9820";
private static final String NS1_NN2_ADDR = "ns1-nn2.example.com:9820"; static final String NS1_NN2_ADDR = "ns1-nn2.example.com:9820";
/** /**
* Reset to default UGI settings since some tests change them. * Reset to default UGI settings since some tests change them.
@ -274,13 +273,13 @@ public void testMultipleNamenodes() throws IOException {
assertEquals(1, nn1Map.size()); assertEquals(1, nn1Map.size());
InetSocketAddress addr = nn1Map.get(null); InetSocketAddress addr = nn1Map.get(null);
assertEquals("localhost", addr.getHostName()); assertEquals("localhost", addr.getHostName());
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT, addr.getPort()); assertEquals(9000, addr.getPort());
Map<String, InetSocketAddress> nn2Map = nnMap.get("nn2"); Map<String, InetSocketAddress> nn2Map = nnMap.get("nn2");
assertEquals(1, nn2Map.size()); assertEquals(1, nn2Map.size());
addr = nn2Map.get(null); addr = nn2Map.get(null);
assertEquals("localhost", addr.getHostName()); assertEquals("localhost", addr.getHostName());
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT, addr.getPort()); assertEquals(9001, addr.getPort());
// Test - can look up nameservice ID from service address // Test - can look up nameservice ID from service address
checkNameServiceId(conf, NN1_ADDRESS, "nn1"); checkNameServiceId(conf, NN1_ADDRESS, "nn1");
@ -315,8 +314,7 @@ public void testDefaultNamenode() throws IOException {
Map<String, InetSocketAddress> defaultNsMap = addrMap.get(null); Map<String, InetSocketAddress> defaultNsMap = addrMap.get(null);
assertEquals(1, defaultNsMap.size()); assertEquals(1, defaultNsMap.size());
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT, assertEquals(9999, defaultNsMap.get(null).getPort());
defaultNsMap.get(null).getPort());
} }
/** /**
@ -493,10 +491,6 @@ public void testHANameNodesWithFederation() throws URISyntaxException {
final String NS1_NN2_HOST = "ns1-nn2.example.com:9820"; final String NS1_NN2_HOST = "ns1-nn2.example.com:9820";
final String NS2_NN1_HOST = "ns2-nn1.example.com:9820"; final String NS2_NN1_HOST = "ns2-nn1.example.com:9820";
final String NS2_NN2_HOST = "ns2-nn2.example.com:9820"; final String NS2_NN2_HOST = "ns2-nn2.example.com:9820";
final String NS1_NN1_SERVICE_HOST = "ns1-nn1.example.com:9840";
final String NS1_NN2_SERVICE_HOST = "ns1-nn2.example.com:9840";
final String NS2_NN1_SERVICE_HOST = "ns2-nn1.example.com:9840";
final String NS2_NN2_SERVICE_HOST = "ns2-nn2.example.com:9840";
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1"); conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://ns1");
// Two nameservices, each with two NNs. // Two nameservices, each with two NNs.
@ -530,14 +524,12 @@ public void testHANameNodesWithFederation() throws URISyntaxException {
assertEquals(NS2_NN1_HOST, map.get("ns2").get("ns2-nn1").toString()); assertEquals(NS2_NN1_HOST, map.get("ns2").get("ns2-nn1").toString());
assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString()); assertEquals(NS2_NN2_HOST, map.get("ns2").get("ns2-nn2").toString());
assertEquals(NS1_NN1_SERVICE_HOST, assertEquals(NS1_NN1_HOST,
DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn1")); DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn1"));
assertEquals(NS1_NN2_SERVICE_HOST, assertEquals(NS1_NN2_HOST,
DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn2")); DFSUtil.getNamenodeServiceAddr(conf, "ns1", "ns1-nn2"));
assertEquals(NS2_NN1_SERVICE_HOST, assertEquals(NS2_NN1_HOST,
DFSUtil.getNamenodeServiceAddr(conf, "ns2", "ns2-nn1")); DFSUtil.getNamenodeServiceAddr(conf, "ns2", "ns2-nn1"));
assertEquals(NS2_NN2_SERVICE_HOST,
DFSUtil.getNamenodeServiceAddr(conf, "ns2", "ns2-nn2"));
// No nameservice was given and we can't determine which service addr // No nameservice was given and we can't determine which service addr
// to use as two nameservices could share a namenode ID. // to use as two nameservices could share a namenode ID.
@ -563,11 +555,9 @@ public void getNameNodeServiceAddr() throws IOException {
// One nameservice with two NNs // One nameservice with two NNs
final String NS1_NN1_HOST = "ns1-nn1.example.com:9820"; final String NS1_NN1_HOST = "ns1-nn1.example.com:9820";
final String NS1_NN1_HOST_SVC = "ns1-nn1.example.com:9821"; final String NS1_NN1_HOST_SVC = "ns1-nn2.example.com:9821";
final String NS1_NN1_HOST_DEFAULT_SVC = "ns1-nn1.example.com:9840"; final String NS1_NN2_HOST = "ns1-nn1.example.com:9820";
final String NS1_NN2_HOST = "ns1-nn2.example.com:9820";
final String NS1_NN2_HOST_SVC = "ns1-nn2.example.com:9821"; final String NS1_NN2_HOST_SVC = "ns1-nn2.example.com:9821";
final String NS1_NN2_HOST_DEFAULT_SVC = "ns1-nn2.example.com:9840";
conf.set(DFS_NAMESERVICES, "ns1"); conf.set(DFS_NAMESERVICES, "ns1");
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2"); conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, "ns1"),"nn1,nn2");
@ -577,15 +567,12 @@ public void getNameNodeServiceAddr() throws IOException {
conf.set(DFSUtil.addKeySuffixes( conf.set(DFSUtil.addKeySuffixes(
DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST); DFS_NAMENODE_RPC_ADDRESS_KEY, "ns1", "nn2"), NS1_NN2_HOST);
// The default service rpc address is used if no service address is defined // The rpc address is used if no service address is defined
assertEquals(NS1_NN1_HOST_DEFAULT_SVC, assertEquals(NS1_NN1_HOST, DFSUtil.getNamenodeServiceAddr(conf, null, "nn1"));
DFSUtil.getNamenodeServiceAddr(conf, null, "nn1")); assertEquals(NS1_NN2_HOST, DFSUtil.getNamenodeServiceAddr(conf, null, "nn2"));
assertEquals(NS1_NN2_HOST_DEFAULT_SVC,
DFSUtil.getNamenodeServiceAddr(conf, null, "nn2"));
// A nameservice is specified explicitly // A nameservice is specified explicitly
assertEquals(NS1_NN1_HOST_DEFAULT_SVC, assertEquals(NS1_NN1_HOST, DFSUtil.getNamenodeServiceAddr(conf, "ns1", "nn1"));
DFSUtil.getNamenodeServiceAddr(conf, "ns1", "nn1"));
assertEquals(null, DFSUtil.getNamenodeServiceAddr(conf, "invalid", "nn1")); assertEquals(null, DFSUtil.getNamenodeServiceAddr(conf, "invalid", "nn1"));
// The service addrs are used when they are defined // The service addrs are used when they are defined
@ -1008,92 +995,6 @@ public void testGetPassword() throws Exception {
Assert.assertEquals(null, DFSUtil.getPassword(conf,"invalid-alias")); Assert.assertEquals(null, DFSUtil.getPassword(conf,"invalid-alias"));
} }
@Test
public void testGetNNServiceRpcAddresses() throws IOException {
Configuration conf = new HdfsConfiguration();
final String NN_HOST = "nn.example.com";
final String NN_ADDRESS = "hdfs://" + NN_HOST + ":9000/";
conf.set(FS_DEFAULT_NAME_KEY, NN_ADDRESS);
// No service RPC, no rpc
Map<String, Map<String, InetSocketAddress>> nsMap = DFSUtil
.getNNServiceRpcAddresses(conf);
assertEquals(1, nsMap.size());
InetSocketAddress address = nsMap.get(null).get(null);
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT,
address.getPort());
assertEquals(NN_HOST, address.getHostName());
// No service RPC
final String RPC_ADDRESS = NN_HOST + ":9191";
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, RPC_ADDRESS);
nsMap = DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, nsMap.size());
address = nsMap.get(null).get(null);
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT,
address.getPort());
assertEquals(NN_HOST, address.getHostName());
// Service RPC present
final String SERVICE_RPC_ADDRESS = NN_HOST + ":9292";
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, SERVICE_RPC_ADDRESS);
nsMap = DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, nsMap.size());
address = nsMap.get(null).get(null);
assertEquals(9292, address.getPort());
assertEquals(NN_HOST, address.getHostName());
}
@Test
public void testGetNNServiceRpcAddressesForHA() throws IOException {
Configuration conf = new HdfsConfiguration();
final String NS = "mycluster";
final String NN1_HOST = "nn1.example.com";
final String NN2_HOST = "nn2.example.com";
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, "hdfs://mycluster");
conf.set(DFS_NAMESERVICES, NS);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, NS),
"nn1,nn2");
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, NS, "nn1"),
NN1_HOST + ":9820");
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, NS, "nn2"),
NN2_HOST + ":9820");
assertTrue(HAUtil.isHAEnabled(conf, NS));
// Without Service RPC keys
Map<String, Map<String, InetSocketAddress>> nsMap =
DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, nsMap.size());
Map<String, InetSocketAddress> nnMap = nsMap.get(NS);
assertEquals(2, nnMap.size());
InetSocketAddress nn1Address = nnMap.get("nn1");
assertEquals(NN1_HOST, nn1Address.getHostName());
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT, nn1Address.getPort());
InetSocketAddress nn2Address = nnMap.get("nn2");
assertEquals(NN2_HOST, nn2Address.getHostName());
assertEquals(DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT, nn2Address.getPort());
// With Service RPC keys
final int CUSTOM_SERVICE_PORT = 9191;
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
NS, "nn1"), NN1_HOST + ":" + CUSTOM_SERVICE_PORT);
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
NS, "nn2"), NN2_HOST + ":" + CUSTOM_SERVICE_PORT);
nsMap = DFSUtil.getNNServiceRpcAddresses(conf);
assertEquals(1, nsMap.size());
nnMap = nsMap.get(NS);
assertEquals(2, nnMap.size());
nn1Address = nnMap.get("nn1");
assertEquals(NN1_HOST, nn1Address.getHostName());
assertEquals(CUSTOM_SERVICE_PORT, nn1Address.getPort());
nn2Address = nnMap.get("nn2");
assertEquals(NN2_HOST, nn2Address.getHostName());
assertEquals(CUSTOM_SERVICE_PORT, nn2Address.getPort());
}
@Test @Test
public void testGetNNServiceRpcAddressesForNsIds() throws IOException { public void testGetNNServiceRpcAddressesForNsIds() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
@ -1116,13 +1017,13 @@ public void testGetNNServiceRpcAddressesForNsIds() throws IOException {
} }
Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil Map<String, Map<String, InetSocketAddress>> nnMap = DFSUtil
.getNNServiceRpcAddresses(conf); .getNNServiceRpcAddressesForCluster(conf);
assertEquals(1, nnMap.size()); assertEquals(1, nnMap.size());
assertTrue(nnMap.containsKey("nn1")); assertTrue(nnMap.containsKey("nn1"));
conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3"); conf.set(DFS_INTERNAL_NAMESERVICES_KEY, "nn3");
try { try {
DFSUtil.getNNServiceRpcAddresses(conf); DFSUtil.getNNServiceRpcAddressesForCluster(conf);
fail("Should fail for misconfiguration"); fail("Should fail for misconfiguration");
} catch (IOException ignored) { } catch (IOException ignored) {
} }

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
@ -278,6 +277,8 @@ public void runTestNameNodePorts(boolean withService) throws Exception {
// different http port // different http port
conf2.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, THIS_HOST); conf2.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, THIS_HOST);
started = canStartNameNode(conf2); started = canStartNameNode(conf2);
if (withService) {
assertFalse("Should've failed on service port", started); assertFalse("Should've failed on service port", started);
// reset conf2 since NameNode modifies it // reset conf2 since NameNode modifies it
@ -286,6 +287,7 @@ public void runTestNameNodePorts(boolean withService) throws Exception {
// Set Service address // Set Service address
conf2.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, THIS_HOST); conf2.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, THIS_HOST);
started = canStartNameNode(conf2); started = canStartNameNode(conf2);
}
assertTrue(started); assertTrue(started);
} finally { } finally {
stopNameNode(nn); stopNameNode(nn);
@ -360,7 +362,6 @@ public void testSecondaryNodePorts() throws Exception {
/** /**
* Verify BackupNode port usage. * Verify BackupNode port usage.
*/ */
@Ignore
@Test(timeout = 300000) @Test(timeout = 300000)
public void testBackupNodePorts() throws Exception { public void testBackupNodePorts() throws Exception {
NameNode nn = null; NameNode nn = null;

View File

@ -324,7 +324,7 @@ public void run(FileSystem fileSystem) throws IOException {
} catch (RemoteException re) { } catch (RemoteException re) {
assertEquals(SafeModeException.class.getName(), re.getClassName()); assertEquals(SafeModeException.class.getName(), re.getClassName());
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
NameNode.getServiceAddress(conf).getHostName(), re); NameNode.getServiceAddress(conf, true).getHostName(), re);
} catch (IOException ioe) { } catch (IOException ioe) {
fail("Encountered exception" + " " + StringUtils.stringifyException(ioe)); fail("Encountered exception" + " " + StringUtils.stringifyException(ioe));
} }

View File

@ -77,9 +77,7 @@ public Builder setNumNameNodes(int nns) {
public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) { public static MiniDFSNNTopology createDefaultTopology(int nns, int startingPort) {
MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE); MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(NAMESERVICE);
for (int i = 0; i < nns; i++) { for (int i = 0; i < nns; i++) {
nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i) nameservice.addNN(new MiniDFSNNTopology.NNConf("nn" + i).setIpcPort(startingPort++)
.setIpcPort(startingPort++)
.setServicePort(startingPort++)
.setHttpPort(startingPort++)); .setHttpPort(startingPort++));
} }
@ -150,9 +148,8 @@ private Configuration initHAConf(URI journalURI, Configuration conf,
int port = basePort; int port = basePort;
for (int i = 0; i < numNNs; i++) { for (int i = 0; i < numNNs; i++) {
nns.add("127.0.0.1:" + port); nns.add("127.0.0.1:" + port);
// increment by 3 each time to account for the http and the service port // increment by 2 each time to account for the http port in the config setting
// in the config setting port += 2;
port += 3;
} }
// use standard failover configurations // use standard failover configurations

View File

@ -89,8 +89,7 @@ public void testBalancerWithHANameNodes() throws Exception {
/ numOfDatanodes, (short) numOfDatanodes, 1); / numOfDatanodes, (short) numOfDatanodes, 1);
// start up an empty node with the same capacity and on the same rack // start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(cluster.getClusterConfiguration(), cluster.startDataNodes(conf, 1, true, null, new String[] { newNodeRack },
1, true, null, new String[] {newNodeRack},
new long[] { newNodeCapacity }); new long[] { newNodeCapacity });
totalCapacity += newNodeCapacity; totalCapacity += newNodeCapacity;
TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client, TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, client,

View File

@ -105,11 +105,8 @@ public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
* *
* @throws IOException * @throws IOException
*/ */
public static DataNode startDNWithMockNN( public static DataNode startDNWithMockNN(Configuration conf,
Configuration conf, final InetSocketAddress nnSocketAddr, final String dnDataDir)
final InetSocketAddress nnSocketAddr,
final InetSocketAddress nnServiceAddr,
final String dnDataDir)
throws IOException { throws IOException {
FileSystem.setDefaultUri(conf, "hdfs://" + nnSocketAddr.getHostName() + ":" FileSystem.setDefaultUri(conf, "hdfs://" + nnSocketAddr.getHostName() + ":"
@ -152,7 +149,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
@Override @Override
DatanodeProtocolClientSideTranslatorPB connectToNN( DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException { InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals(nnServiceAddr, nnAddr); Assert.assertEquals(nnSocketAddr, nnAddr);
return namenode; return namenode;
} }
}; };

View File

@ -124,6 +124,8 @@ public class TestBlockRecovery {
private final static long RECOVERY_ID = 3000L; private final static long RECOVERY_ID = 3000L;
private final static String CLUSTER_ID = "testClusterID"; private final static String CLUSTER_ID = "testClusterID";
private final static String POOL_ID = "BP-TEST"; private final static String POOL_ID = "BP-TEST";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
private final static long BLOCK_ID = 1000L; private final static long BLOCK_ID = 1000L;
private final static long GEN_STAMP = 2000L; private final static long GEN_STAMP = 2000L;
private final static long BLOCK_LEN = 3000L; private final static long BLOCK_LEN = 3000L;
@ -186,7 +188,7 @@ public void startUp() throws IOException, URISyntaxException {
} }
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf, FileSystem.setDefaultUri(conf,
"hdfs://localhost:5020"); "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>(); ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
File dataDir = new File(DATA_DIR); File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir); FileUtil.fullyDelete(dataDir);
@ -229,7 +231,7 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
@Override @Override
DatanodeProtocolClientSideTranslatorPB connectToNN( DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException { InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals("localhost:9840", nnAddr.toString()); Assert.assertEquals(NN_ADDR, nnAddr);
return namenode; return namenode;
} }
}; };

View File

@ -61,16 +61,11 @@
public class TestDataNodeMetricsLogger { public class TestDataNodeMetricsLogger {
static final Log LOG = LogFactory.getLog(TestDataNodeMetricsLogger.class); static final Log LOG = LogFactory.getLog(TestDataNodeMetricsLogger.class);
@Rule
public Timeout globalTimeout = new Timeout(120_000);
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+ "data"; + "data";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress( private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020); "localhost", 5020);
private final static InetSocketAddress NN_SERVICE_ADDR =
new InetSocketAddress("localhost", 5021);
private DataNode dn; private DataNode dn;
@ -91,13 +86,10 @@ public void startDNForTest(boolean enableMetricsLogging) throws IOException {
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
NN_SERVICE_ADDR.getHostName() + ":" + NN_SERVICE_ADDR.getPort());
conf.setInt(DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, conf.setInt(DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
enableMetricsLogging ? 1 : 0); // If enabled, log early and log often enableMetricsLogging ? 1 : 0); // If enabled, log early and log often
dn = InternalDataNodeTestUtils.startDNWithMockNN( dn = InternalDataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
conf, NN_ADDR, NN_SERVICE_ADDR, DATA_DIR);
} }
/** /**

View File

@ -109,16 +109,16 @@ public void test2NNRegistration() throws IOException {
BPOfferService bpos2 = dn.getAllBpOs().get(1); BPOfferService bpos2 = dn.getAllBpOs().get(1);
// The order of bpos is not guaranteed, so fix the order // The order of bpos is not guaranteed, so fix the order
if (getNNSocketAddress(bpos1).equals(nn2.getServiceRpcAddress())) { if (getNNSocketAddress(bpos1).equals(nn2.getNameNodeAddress())) {
BPOfferService tmp = bpos1; BPOfferService tmp = bpos1;
bpos1 = bpos2; bpos1 = bpos2;
bpos2 = tmp; bpos2 = tmp;
} }
assertEquals("wrong nn address", getNNSocketAddress(bpos1), assertEquals("wrong nn address", getNNSocketAddress(bpos1),
nn1.getServiceRpcAddress()); nn1.getNameNodeAddress());
assertEquals("wrong nn address", getNNSocketAddress(bpos2), assertEquals("wrong nn address", getNNSocketAddress(bpos2),
nn2.getServiceRpcAddress()); nn2.getNameNodeAddress());
assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2); assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
assertEquals("wrong cid", dn.getClusterId(), cid1); assertEquals("wrong cid", dn.getClusterId(), cid1);
@ -182,7 +182,7 @@ public void testFedSingleNN() throws IOException {
assertEquals("wrong nn address", assertEquals("wrong nn address",
getNNSocketAddress(bpos1), getNNSocketAddress(bpos1),
nn1.getServiceRpcAddress()); nn1.getNameNodeAddress());
assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1); assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
assertEquals("wrong cid", dn.getClusterId(), cid1); assertEquals("wrong cid", dn.getClusterId(), cid1);
cluster.shutdown(); cluster.shutdown();

View File

@ -51,10 +51,8 @@ public class TestDataNodeReconfiguration {
private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class); private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+ "data"; + "data";
private final static InetSocketAddress NN_ADDR = private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
new InetSocketAddress("localhost", 5020); "localhost", 5020);
private final static InetSocketAddress NN_SERVICE_ADDR =
new InetSocketAddress("localhost", 5021);
private final int NUM_NAME_NODE = 1; private final int NUM_NAME_NODE = 1;
private final int NUM_DATA_NODE = 10; private final int NUM_DATA_NODE = 10;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
@ -101,13 +99,10 @@ public DataNode[] createDNsForTest(int numDateNode) throws IOException {
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
NN_SERVICE_ADDR.getHostName() + ":" + NN_SERVICE_ADDR.getPort());
DataNode[] result = new DataNode[numDateNode]; DataNode[] result = new DataNode[numDateNode];
for (int i = 0; i < numDateNode; i++) { for (int i = 0; i < numDateNode; i++) {
result[i] = InternalDataNodeTestUtils.startDNWithMockNN( result[i] = InternalDataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
conf, NN_ADDR, NN_SERVICE_ADDR, DATA_DIR);
} }
return result; return result;
} }

View File

@ -78,6 +78,8 @@ public class TestDatanodeProtocolRetryPolicy {
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>(); ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
private final static String CLUSTER_ID = "testClusterID"; private final static String CLUSTER_ID = "testClusterID";
private final static String POOL_ID = "BP-TEST"; private final static String POOL_ID = "BP-TEST";
private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
"localhost", 5020);
private static DatanodeRegistration datanodeRegistration = private static DatanodeRegistration datanodeRegistration =
DFSTestUtil.getLocalDatanodeRegistration(); DFSTestUtil.getLocalDatanodeRegistration();
@ -99,7 +101,7 @@ public void startUp() throws IOException, URISyntaxException {
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
FileSystem.setDefaultUri(conf, FileSystem.setDefaultUri(conf,
"hdfs://localhost:5020"); "hdfs://" + NN_ADDR.getHostName() + ":" + NN_ADDR.getPort());
File dataDir = new File(DATA_DIR); File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir); FileUtil.fullyDelete(dataDir);
dataDir.mkdirs(); dataDir.mkdirs();
@ -226,7 +228,7 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
@Override @Override
DatanodeProtocolClientSideTranslatorPB connectToNN( DatanodeProtocolClientSideTranslatorPB connectToNN(
InetSocketAddress nnAddr) throws IOException { InetSocketAddress nnAddr) throws IOException {
Assert.assertEquals("localhost:9840", nnAddr.toString()); Assert.assertEquals(NN_ADDR, nnAddr);
return namenode; return namenode;
} }
}; };

View File

@ -44,11 +44,6 @@ public class TestRefreshNamenodes {
private final int nnPort3 = 2227; private final int nnPort3 = 2227;
private final int nnPort4 = 2230; private final int nnPort4 = 2230;
private final int nnServicePort1 = 2222;
private final int nnServicePort2 = 2225;
private final int nnServicePort3 = 2228;
private final int nnServicePort4 = 2231;
@Test @Test
public void testRefreshNamenodes() throws IOException { public void testRefreshNamenodes() throws IOException {
// Start cluster with a single NN and DN // Start cluster with a single NN and DN
@ -57,9 +52,7 @@ public void testRefreshNamenodes() throws IOException {
try { try {
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new NSConf("ns1").addNN( .addNameservice(new NSConf("ns1").addNN(
new NNConf(null) new NNConf(null).setIpcPort(nnPort1)))
.setIpcPort(nnPort1)
.setServicePort(nnServicePort1)))
.setFederation(true); .setFederation(true);
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology) .nnTopology(topology)
@ -68,20 +61,20 @@ public void testRefreshNamenodes() throws IOException {
DataNode dn = cluster.getDataNodes().get(0); DataNode dn = cluster.getDataNodes().get(0);
assertEquals(1, dn.getAllBpOs().size()); assertEquals(1, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort2, nnServicePort2); cluster.addNameNode(conf, nnPort2);
assertEquals(2, dn.getAllBpOs().size()); assertEquals(2, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort3, nnServicePort3); cluster.addNameNode(conf, nnPort3);
assertEquals(3, dn.getAllBpOs().size()); assertEquals(3, dn.getAllBpOs().size());
cluster.addNameNode(conf, nnPort4, nnServicePort4); cluster.addNameNode(conf, nnPort4);
// Ensure a BPOfferService in the datanodes corresponds to // Ensure a BPOfferService in the datanodes corresponds to
// a namenode in the cluster // a namenode in the cluster
Set<InetSocketAddress> nnAddrsFromCluster = Sets.newHashSet(); Set<InetSocketAddress> nnAddrsFromCluster = Sets.newHashSet();
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
assertTrue(nnAddrsFromCluster.add( assertTrue(nnAddrsFromCluster.add(
cluster.getNameNode(i).getServiceRpcAddress())); cluster.getNameNode(i).getNameNodeAddress()));
} }
Set<InetSocketAddress> nnAddrsFromDN = Sets.newHashSet(); Set<InetSocketAddress> nnAddrsFromDN = Sets.newHashSet();

View File

@ -54,7 +54,6 @@
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
@ -62,7 +61,6 @@
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@Ignore("Temporarily disabling the BackupNode unit test.")
public class TestBackupNode { public class TestBackupNode {
public static final Log LOG = LogFactory.getLog(TestBackupNode.class); public static final Log LOG = LogFactory.getLog(TestBackupNode.class);

View File

@ -1364,9 +1364,9 @@ public void testMultipleSecondaryNamenodes() throws IOException {
Configuration snConf1 = new HdfsConfiguration(cluster.getConfiguration(0)); Configuration snConf1 = new HdfsConfiguration(cluster.getConfiguration(0));
Configuration snConf2 = new HdfsConfiguration(cluster.getConfiguration(1)); Configuration snConf2 = new HdfsConfiguration(cluster.getConfiguration(1));
InetSocketAddress nn1RpcAddress = cluster.getNameNode(0) InetSocketAddress nn1RpcAddress = cluster.getNameNode(0)
.getServiceRpcAddress(); .getNameNodeAddress();
InetSocketAddress nn2RpcAddress = cluster.getNameNode(1) InetSocketAddress nn2RpcAddress = cluster.getNameNode(1)
.getServiceRpcAddress(); .getNameNodeAddress();
String nn1 = nn1RpcAddress.getHostName() + ":" + nn1RpcAddress.getPort(); String nn1 = nn1RpcAddress.getHostName() + ":" + nn1RpcAddress.getPort();
String nn2 = nn2RpcAddress.getHostName() + ":" + nn2RpcAddress.getPort(); String nn2 = nn2RpcAddress.getHostName() + ":" + nn2RpcAddress.getPort();
@ -1923,7 +1923,6 @@ public void testReformatNNBetweenCheckpoints() throws IOException {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(true).build(); .format(true).build();
int origPort = cluster.getNameNodePort(); int origPort = cluster.getNameNodePort();
int origServicePort = cluster.getNameNodeServicePort();
int origHttpPort = cluster.getNameNode().getHttpAddress().getPort(); int origHttpPort = cluster.getNameNode().getHttpAddress().getPort();
Configuration snnConf = new Configuration(conf); Configuration snnConf = new Configuration(conf);
File checkpointDir = new File(MiniDFSCluster.getBaseDirectory(), File checkpointDir = new File(MiniDFSCluster.getBaseDirectory(),
@ -1950,7 +1949,6 @@ public void testReformatNNBetweenCheckpoints() throws IOException {
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0) .numDataNodes(0)
.nameNodePort(origPort) .nameNodePort(origPort)
.nameNodeServicePort(origServicePort)
.nameNodeHttpPort(origHttpPort) .nameNodeHttpPort(origHttpPort)
.format(true).build(); .format(true).build();

View File

@ -661,15 +661,12 @@ public void testNNDirectorySize() throws Exception{
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
try{ try{
// Have to specify IPC ports so the NNs can talk to each other. // Have to specify IPC ports so the NNs can talk to each other.
int[] ports = ServerSocketUtil.getPorts(4); int[] ports = ServerSocketUtil.getPorts(2);
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1") .addNN(new MiniDFSNNTopology.NNConf("nn1").setIpcPort(ports[0]))
.setIpcPort(ports[0]) .addNN(
.setServicePort(ports[1])) new MiniDFSNNTopology.NNConf("nn2").setIpcPort(ports[1])));
.addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ports[2])
.setServicePort(ports[3])));
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology).numDataNodes(0) .nnTopology(topology).numDataNodes(0)

View File

@ -110,7 +110,6 @@ private NameNode makeNameNode(boolean enableMetricsLogging)
throws IOException { throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://localhost:0"); conf.set(FS_DEFAULT_NAME_KEY, "hdfs://localhost:0");
conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "0.0.0.0:0");
conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
conf.setInt(DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY, conf.setInt(DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY,
enableMetricsLogging ? 1 : 0); // If enabled, log early and log often enableMetricsLogging ? 1 : 0); // If enabled, log early and log often

View File

@ -125,8 +125,6 @@ public void testGenericKeysForNameNodeFormat()
// Set ephemeral ports // Set ephemeral ports
conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY,
"127.0.0.1:0"); "127.0.0.1:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
"127.0.0.1:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY,
"127.0.0.1:0"); "127.0.0.1:0");

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;

View File

@ -171,18 +171,15 @@ private static void testStandbyTriggersLogRolls(int activeIndex)
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
try { try {
// Have to specify IPC ports so the NNs can talk to each other. // Have to specify IPC ports so the NNs can talk to each other.
int[] ports = ServerSocketUtil.getPorts(6); int[] ports = ServerSocketUtil.getPorts(3);
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1") .addNN(new MiniDFSNNTopology.NNConf("nn1")
.setIpcPort(ports[0]) .setIpcPort(ports[0]))
.setServicePort(ports[1]))
.addNN(new MiniDFSNNTopology.NNConf("nn2") .addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ports[2]) .setIpcPort(ports[1]))
.setServicePort(ports[3]))
.addNN(new MiniDFSNNTopology.NNConf("nn3") .addNN(new MiniDFSNNTopology.NNConf("nn3")
.setIpcPort(ports[4]) .setIpcPort(ports[2])));
.setServicePort(ports[5])));
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology) .nnTopology(topology)
@ -222,14 +219,11 @@ public void testTriggersLogRollsForAllStandbyNN() throws Exception {
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1") .addNN(new MiniDFSNNTopology.NNConf("nn1")
.setIpcPort(ServerSocketUtil.getPort(0, 100)) .setIpcPort(ServerSocketUtil.getPort(0, 100)))
.setServicePort(ServerSocketUtil.getPort(0, 100)))
.addNN(new MiniDFSNNTopology.NNConf("nn2") .addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ServerSocketUtil.getPort(0, 100)) .setIpcPort(ServerSocketUtil.getPort(0, 100)))
.setServicePort(ServerSocketUtil.getPort(0, 100)))
.addNN(new MiniDFSNNTopology.NNConf("nn3") .addNN(new MiniDFSNNTopology.NNConf("nn3")
.setIpcPort(ServerSocketUtil.getPort(0, 100)) .setIpcPort(ServerSocketUtil.getPort(0, 100))));
.setServicePort(ServerSocketUtil.getPort(0, 100))));
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology) .nnTopology(topology)

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.tools; package org.apache.hadoop.hdfs.tools;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -180,11 +179,9 @@ public void testGetAllServiceState() throws Exception {
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol) Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol)
.getServiceStatus(); .getServiceStatus();
assertEquals(0, runTool("-getAllServiceState")); assertEquals(0, runTool("-getAllServiceState"));
assertOutputContains(String.format("%-50s %-10s", (HOST_A + ":" + assertOutputContains(String.format("%-50s %-10s", (HOST_A + ":" + 12345),
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT),
STANDBY_READY_RESULT.getState())); STANDBY_READY_RESULT.getState()));
assertOutputContains(String.format("%-50s %-10s", (HOST_B + ":" + assertOutputContains(String.format("%-50s %-10s", (HOST_B + ":" + 12345),
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT),
STANDBY_READY_RESULT.getState())); STANDBY_READY_RESULT.getState()));
} }

View File

@ -77,7 +77,7 @@ public void setup() throws IOException {
tool.setErrOut(new PrintStream(errOutBytes)); tool.setErrOut(new PrintStream(errOutBytes));
cluster.waitActive(); cluster.waitActive();
nn1Port = cluster.getNameNodeServicePort(0); nn1Port = cluster.getNameNodePort(0);
} }
@After @After

View File

@ -88,11 +88,9 @@ public void setup() throws Exception {
MiniDFSNNTopology topology = new MiniDFSNNTopology() MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1") .addNameservice(new MiniDFSNNTopology.NSConf("ns1")
.addNN(new MiniDFSNNTopology.NNConf("nn1") .addNN(new MiniDFSNNTopology.NNConf("nn1")
.setIpcPort(ServerSocketUtil.getPort(10021, 100)) .setIpcPort(ServerSocketUtil.getPort(10021, 100)))
.setServicePort(ServerSocketUtil.getPort(10025, 100)))
.addNN(new MiniDFSNNTopology.NNConf("nn2") .addNN(new MiniDFSNNTopology.NNConf("nn2")
.setIpcPort(ServerSocketUtil.getPort(10022, 100)) .setIpcPort(ServerSocketUtil.getPort(10022, 100))));
.setServicePort(ServerSocketUtil.getPort(10026, 100))));
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(topology) .nnTopology(topology)
.numDataNodes(0) .numDataNodes(0)

View File

@ -24,7 +24,6 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -122,13 +121,13 @@ private Map<String, Map<String, InetSocketAddress>> getAddressListFromConf(
TestType type, HdfsConfiguration conf) throws IOException { TestType type, HdfsConfiguration conf) throws IOException {
switch (type) { switch (type) {
case NAMENODE: case NAMENODE:
return DFSUtil.getNNServiceRpcAddresses(conf); return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
case BACKUP: case BACKUP:
return DFSUtil.getBackupNodeAddresses(conf); return DFSUtil.getBackupNodeAddresses(conf);
case SECONDARY: case SECONDARY:
return DFSUtil.getSecondaryNameNodeAddresses(conf); return DFSUtil.getSecondaryNameNodeAddresses(conf);
case NNRPCADDRESSES: case NNRPCADDRESSES:
return DFSUtil.getNNServiceRpcAddresses(conf); return DFSUtil.getNNServiceRpcAddressesForCluster(conf);
} }
return null; return null;
} }
@ -279,12 +278,10 @@ public void testInvalidArgument() throws Exception {
public void testNonFederation() throws Exception { public void testNonFederation() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(false); HdfsConfiguration conf = new HdfsConfiguration(false);
// Returned namenode address should match the default service address // Returned namenode address should match default address
conf.set(FS_DEFAULT_NAME_KEY, "hdfs://localhost:1000"); conf.set(FS_DEFAULT_NAME_KEY, "hdfs://localhost:1000");
verifyAddresses(conf, TestType.NAMENODE, false, "localhost:" + verifyAddresses(conf, TestType.NAMENODE, false, "localhost:1000");
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:1000");
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:" +
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
// Returned address should match backupnode RPC address // Returned address should match backupnode RPC address
conf.set(DFS_NAMENODE_BACKUP_ADDRESS_KEY,"localhost:1001"); conf.set(DFS_NAMENODE_BACKUP_ADDRESS_KEY,"localhost:1001");
@ -301,13 +298,11 @@ public void testNonFederation() throws Exception {
verifyAddresses(conf, TestType.NAMENODE, false, "localhost:1000"); verifyAddresses(conf, TestType.NAMENODE, false, "localhost:1000");
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:1000"); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:1000");
// Returned namenode address should match the default service address // Returned address should match RPC address
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "localhost:1001"); conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "localhost:1001");
verifyAddresses(conf, TestType.NAMENODE, false, "localhost:" + verifyAddresses(conf, TestType.NAMENODE, false, "localhost:1001");
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:1001");
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, "localhost:" +
DFS_NAMENODE_SERVICE_RPC_PORT_DEFAULT);
} }
/** /**
@ -335,6 +330,23 @@ public void testFederation() throws Exception {
verifyAddresses(conf, TestType.BACKUP, false, backupAddresses); verifyAddresses(conf, TestType.BACKUP, false, backupAddresses);
verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses); verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses);
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses);
// Test to ensure namenode, backup, secondary namenode addresses and
// namenode rpc addresses are returned from federation configuration.
// Returned namenode addresses are based on regular RPC address
// in the absence of service RPC address.
conf = new HdfsConfiguration(false);
setupNameServices(conf, nsCount);
nnAddresses = setupAddress(conf,
DFS_NAMENODE_RPC_ADDRESS_KEY, nsCount, 1000);
backupAddresses = setupAddress(conf,
DFS_NAMENODE_BACKUP_ADDRESS_KEY, nsCount, 2000);
secondaryAddresses = setupAddress(conf,
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, nsCount, 3000);
verifyAddresses(conf, TestType.NAMENODE, false, nnAddresses);
verifyAddresses(conf, TestType.BACKUP, false, backupAddresses);
verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses);
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses);
} }
@Test(timeout=10000) @Test(timeout=10000)