diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java index fe4f939d6b..38f63e5367 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java @@ -94,8 +94,9 @@ public class NamenodeHeartbeatService extends PeriodicService { */ public NamenodeHeartbeatService( ActiveNamenodeResolver resolver, String nsId, String nnId) { - super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " + - nnId); + super(NamenodeHeartbeatService.class.getSimpleName() + + (nsId == null ? "" : " " + nsId) + + (nnId == null ? "" : " " + nnId)); this.resolver = resolver; @@ -109,28 +110,28 @@ protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; + String nnDesc = nameserviceId; if (this.namenodeId != null && !this.namenodeId.isEmpty()) { this.localTarget = new NNHAServiceTarget( conf, nameserviceId, namenodeId); + nnDesc += "-" + namenodeId; } else { this.localTarget = null; } // Get the RPC address for the clients to connect this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); - LOG.info("{}-{} RPC address: {}", - nameserviceId, namenodeId, rpcAddress); + LOG.info("{} RPC address: {}", nnDesc, rpcAddress); // Get the Service RPC address for monitoring this.serviceAddress = DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); if (this.serviceAddress == null) { - LOG.error("Cannot locate RPC service address for NN {}-{}, " + - "using RPC address {}", nameserviceId, namenodeId, this.rpcAddress); + LOG.error("Cannot locate RPC service address for NN {}, " + + "using RPC address {}", nnDesc, this.rpcAddress); this.serviceAddress = this.rpcAddress; } - LOG.info("{}-{} Service RPC address: {}", - nameserviceId, namenodeId, serviceAddress); + LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); // Get the Lifeline RPC address for faster monitoring this.lifelineAddress = @@ -138,13 +139,12 @@ protected void serviceInit(Configuration configuration) throws Exception { if (this.lifelineAddress == null) { this.lifelineAddress = this.serviceAddress; } - LOG.info("{}-{} Lifeline RPC address: {}", - nameserviceId, namenodeId, lifelineAddress); + LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); // Get the Web address for UI this.webAddress = DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); - LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress); + LOG.info("{} Web address: {}", nnDesc, webAddress); this.setIntervalMs(conf.getLong( DFS_ROUTER_HEARTBEAT_INTERVAL_MS, @@ -173,7 +173,7 @@ private static String getRpcAddress( String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; String ret = conf.get(confKey); - if (nsId != null && nnId != null) { + if (nsId != null || nnId != null) { // Get if for the proper nameservice and namenode confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); ret = conf.get(confKey); @@ -182,10 +182,16 @@ private static String getRpcAddress( if (ret == null) { Map rpcAddresses = DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); - if (rpcAddresses.containsKey(nnId)) { - InetSocketAddress sockAddr = rpcAddresses.get(nnId); + InetSocketAddress sockAddr = null; + if (nnId != null) { + sockAddr = rpcAddresses.get(nnId); + } else if (rpcAddresses.size() == 1) { + // Get the only namenode in the namespace + sockAddr = rpcAddresses.values().iterator().next(); + } + if (sockAddr != null) { InetAddress addr = sockAddr.getAddress(); - ret = addr.getHostAddress() + ":" + sockAddr.getPort(); + ret = addr.getHostName() + ":" + sockAddr.getPort(); } } } @@ -279,9 +285,14 @@ protected NamenodeStatusReport getNamenodeStatusReport() { HAServiceStatus status = haProtocol.getServiceStatus(); report.setHAServiceState(status.getState()); } catch (Throwable e) { - // Failed to fetch HA status, ignoring failure - LOG.error("Cannot fetch HA status for {}: {}", - getNamenodeDesc(), e.getMessage(), e); + if (e.getMessage().startsWith("HA for namenode is not enabled")) { + LOG.error("HA for {} is not enabled", getNamenodeDesc()); + localTarget = null; + } else { + // Failed to fetch HA status, ignoring failure + LOG.error("Cannot fetch HA status for {}: {}", + getNamenodeDesc(), e.getMessage(), e); + } } } } catch(IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 1ee49d566a..e79674d5e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY; @@ -31,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS; @@ -136,8 +139,7 @@ public class RouterContext { private RouterClient adminClient; private URI fileSystemUri; - public RouterContext(Configuration conf, String nsId, String nnId) - throws URISyntaxException { + public RouterContext(Configuration conf, String nsId, String nnId) { this.conf = conf; this.nameserviceId = nsId; this.namenodeId = nnId; @@ -397,10 +399,14 @@ public Configuration generateNamenodeConfiguration(String nsId) { conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.rpcPort); + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix, + "127.0.0.1:" + context.servicePort); conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, "127.0.0.1:" + context.httpPort); conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix, "0.0.0.0"); + conf.set(DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY + "." + suffix, + "0.0.0.0"); } } @@ -457,6 +463,19 @@ public Configuration generateRouterConfiguration(String nsId, String nnId) { conf.set(DFS_HA_NAMENODE_ID_KEY, nnId); } + // Namenodes to monitor + StringBuilder sb = new StringBuilder(); + for (String ns : this.nameservices) { + for (NamenodeContext context : getNamenodes(ns)) { + String suffix = context.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + } + conf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + // Add custom overrides if available if (this.routerOverrides != null) { for (Entry entry : this.routerOverrides) {