diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index efddc0e5c9..028ba7175b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -739,6 +739,23 @@ public static String getHostname() { public static String getHostPortString(InetSocketAddress addr) { return addr.getHostName() + ":" + addr.getPort(); } + + /** + * Get port as integer from host port string like host:port. + * + * @param addr host + port string like host:port. + * @return an integer value representing the port. + * @throws IllegalArgumentException if the input is not in the correct format. + */ + public static int getPortFromHostPortString(String addr) + throws IllegalArgumentException { + String[] hostport = addr.split(":"); + if (hostport.length != 2) { + String errorMsg = "Address should be :, but it is " + addr; + throw new IllegalArgumentException(errorMsg); + } + return Integer.parseInt(hostport[1]); + } /** * Checks if {@code host} is a local host name and return {@link InetAddress} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index 0bf2c4473a..ad5d8d7fa8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.security.KerberosAuthException; import org.apache.hadoop.security.NetUtilsTestResolver; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; @@ -765,6 +766,18 @@ public void testTrimCreateSocketAddress() { assertEquals(defaultAddr.trim(), NetUtils.getHostPortString(addr)); } + @Test + public void testGetPortFromHostPortString() throws Exception { + + assertEquals(1002, NetUtils.getPortFromHostPortString("testHost:1002")); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> NetUtils.getPortFromHostPortString("testHost")); + + LambdaTestUtils.intercept(IllegalArgumentException.class, + () -> NetUtils.getPortFromHostPortString("testHost:randomString")); + } + @Test public void testBindToLocalAddress() throws Exception { assertNotNull(NetUtils diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java index 6b3fa280ab..6f54de05eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java @@ -426,37 +426,60 @@ static Map getResolvedAddressesForNsId( Collection nnIds = getNameNodeIds(conf, nsId); Map ret = Maps.newLinkedHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { - String suffix = concatSuffixes(nsId, nnId); - String address = checkKeysAndProcess(defaultValue, suffix, conf, keys); - if (address != null) { - InetSocketAddress isa = NetUtils.createSocketAddr(address); - try { - // Datanode should just use FQDN - String[] resolvedHostNames = dnr - .getAllResolvedHostnameByDomainName(isa.getHostName(), true); - int port = isa.getPort(); - for (String hostname : resolvedHostNames) { - InetSocketAddress inetSocketAddress = new InetSocketAddress( - hostname, port); - // Concat nn info with host info to make uniq ID - String concatId; - if (nnId == null || nnId.isEmpty()) { - concatId = String - .join("-", nsId, hostname, String.valueOf(port)); - } else { - concatId = String - .join("-", nsId, nnId, hostname, String.valueOf(port)); - } - ret.put(concatId, inetSocketAddress); - } - } catch (UnknownHostException e) { - LOG.error("Failed to resolve address: " + address); + Map resolvedAddressesForNnId = + getResolvedAddressesForNnId(conf, nsId, nnId, dnr, defaultValue, keys); + ret.putAll(resolvedAddressesForNnId); + } + return ret; + } + + public static Map getResolvedAddressesForNnId( + Configuration conf, String nsId, String nnId, + DomainNameResolver dnr, String defaultValue, + String... keys) { + String suffix = concatSuffixes(nsId, nnId); + String address = checkKeysAndProcess(defaultValue, suffix, conf, keys); + Map ret = Maps.newLinkedHashMap(); + if (address != null) { + InetSocketAddress isa = NetUtils.createSocketAddr(address); + try { + String[] resolvedHostNames = dnr + .getAllResolvedHostnameByDomainName(isa.getHostName(), true); + int port = isa.getPort(); + for (String hostname : resolvedHostNames) { + InetSocketAddress inetSocketAddress = new InetSocketAddress( + hostname, port); + // Concat nn info with host info to make uniq ID + String concatId = getConcatNnId(nsId, nnId, hostname, port); + ret.put(concatId, inetSocketAddress); } + } catch (UnknownHostException e) { + LOG.error("Failed to resolve address: {}", address); } } return ret; } + /** + * Concat nn info with host info to make uniq ID. + * This is mainly used when configured nn is + * a domain record that has multiple hosts behind it. + * + * @param nsId nsId to be concatenated to a uniq ID. + * @param nnId nnId to be concatenated to a uniq ID. + * @param hostname hostname to be concatenated to a uniq ID. + * @param port port to be concatenated to a uniq ID. + * @return Concatenated uniq id. + */ + private static String getConcatNnId(String nsId, String nnId, String hostname, int port) { + if (nnId == null || nnId.isEmpty()) { + return String + .join("-", nsId, hostname, String.valueOf(port)); + } + return String + .join("-", nsId, nnId, hostname, String.valueOf(port)); + } + /** * Returns the configured address for all NameNodes in the cluster. * @param conf configuration diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java index fb8cc693c6..1afce83a7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.tools.DFSHAAdmin; import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.net.NetUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -94,6 +95,9 @@ public class NamenodeHeartbeatService extends PeriodicService { private URLConnectionFactory connectionFactory; /** URL scheme to use for JMX calls. */ private String scheme; + + private String resolvedHost; + private String originalNnId; /** * Create a new Namenode status updater. * @param resolver Namenode resolver service to handle NN registration. @@ -110,6 +114,28 @@ public NamenodeHeartbeatService( this.nameserviceId = nsId; this.namenodeId = nnId; + } + + /** + * Create a new Namenode status updater. + * + * @param resolver Namenode resolver service to handle NN registration. + * @param nsId Identifier of the nameservice. + * @param nnId Identifier of the namenode in HA. + * @param resolvedHost resolvedHostname for this specific namenode. + */ + public NamenodeHeartbeatService( + ActiveNamenodeResolver resolver, String nsId, String nnId, String resolvedHost) { + super(getNnHeartBeatServiceName(nsId, nnId)); + + this.resolver = resolver; + + this.nameserviceId = nsId; + // Concat a uniq id from original nnId and resolvedHost + this.namenodeId = nnId + "-" + resolvedHost; + this.resolvedHost = resolvedHost; + // Same the original nnid to get the ports from config. + this.originalNnId = nnId; } @@ -120,40 +146,59 @@ protected void serviceInit(Configuration configuration) throws Exception { String nnDesc = nameserviceId; if (this.namenodeId != null && !this.namenodeId.isEmpty()) { - this.localTarget = new NNHAServiceTarget( - conf, nameserviceId, namenodeId); nnDesc += "-" + namenodeId; } else { this.localTarget = null; } + if (originalNnId == null) { + originalNnId = namenodeId; + } // Get the RPC address for the clients to connect - this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); - LOG.info("{} RPC address: {}", nnDesc, rpcAddress); + this.rpcAddress = getRpcAddress(conf, nameserviceId, originalNnId); // Get the Service RPC address for monitoring this.serviceAddress = - DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); + DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, originalNnId); if (this.serviceAddress == null) { LOG.error("Cannot locate RPC service address for NN {}, " + "using RPC address {}", nnDesc, this.rpcAddress); this.serviceAddress = this.rpcAddress; } - LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); // Get the Lifeline RPC address for faster monitoring this.lifelineAddress = - DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId); + DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, originalNnId); if (this.lifelineAddress == null) { this.lifelineAddress = this.serviceAddress; } - LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); // Get the Web address for UI this.webAddress = - DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); + DFSUtil.getNamenodeWebAddr(conf, nameserviceId, originalNnId); + + if (resolvedHost != null) { + // Get the addresses from resolvedHost plus the configured ports. + rpcAddress = resolvedHost + ":" + + NetUtils.getPortFromHostPortString(rpcAddress); + serviceAddress = resolvedHost + ":" + + NetUtils.getPortFromHostPortString(serviceAddress); + lifelineAddress = resolvedHost + ":" + + NetUtils.getPortFromHostPortString(lifelineAddress); + webAddress = resolvedHost + ":" + + NetUtils.getPortFromHostPortString(webAddress); + } + + LOG.info("{} RPC address: {}", nnDesc, rpcAddress); + LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); + LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); LOG.info("{} Web address: {}", nnDesc, webAddress); + if (this.namenodeId != null && !this.namenodeId.isEmpty()) { + this.localTarget = new NNHAServiceTarget( + conf, nameserviceId, namenodeId, serviceAddress, lifelineAddress); + } + this.connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf); @@ -336,6 +381,12 @@ public String getNamenodeDesc() { } } + private static String getNnHeartBeatServiceName(String nsId, String nnId) { + return NamenodeHeartbeatService.class.getSimpleName() + + (nsId == null ? "" : " " + nsId) + + (nnId == null ? "" : " " + nnId); + } + /** * Get the parameters for a Namenode from JMX and add them to the report. * @param address Web interface of the Namenode to monitor. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 4777da4700..741e470c6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -98,6 +98,12 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { TimeUnit.SECONDS.toMillis(5); public static final String DFS_ROUTER_MONITOR_NAMENODE = FEDERATION_ROUTER_PREFIX + "monitor.namenode"; + public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED = + FEDERATION_ROUTER_PREFIX + "monitor.namenode.nameservice.resolution-enabled"; + public static final boolean + DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED_DEFAULT = false; + public static final String DFS_ROUTER_MONITOR_NAMENODE_RESOLVER_IMPL + = FEDERATION_ROUTER_PREFIX + "monitor.namenode.nameservice.resolver.impl"; public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE = FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable"; public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 01b71ffd4f..0bf53bbb89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_HOSTNAME_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY; @@ -36,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.common.TokenVerifier; @@ -48,9 +51,12 @@ import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.net.DomainNameResolver; +import org.apache.hadoop.net.DomainNameResolverFactory; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -534,10 +540,34 @@ public void verifyToken(DelegationTokenIdentifier tokenId, byte[] password) LOG.error("Wrong Namenode to monitor: {}", namenode); } if (nsId != null) { - NamenodeHeartbeatService heartbeatService = - createNamenodeHeartbeatService(nsId, nnId); - if (heartbeatService != null) { - ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); + String configKeyWithHost = + RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED + "." + nsId; + boolean resolveNeeded = conf.getBoolean(configKeyWithHost, + RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED_DEFAULT); + + if (nnId != null && resolveNeeded) { + DomainNameResolver dnr = DomainNameResolverFactory.newInstance( + conf, nsId, RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE_RESOLVER_IMPL); + + Map hosts = Maps.newLinkedHashMap(); + Map resolvedHosts = + DFSUtilClient.getResolvedAddressesForNnId(conf, nsId, nnId, dnr, + null, DFS_NAMENODE_RPC_ADDRESS_KEY, + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY); + hosts.putAll(resolvedHosts); + for (InetSocketAddress isa : hosts.values()) { + NamenodeHeartbeatService heartbeatService = + createNamenodeHeartbeatService(nsId, nnId, isa.getHostName()); + if (heartbeatService != null) { + ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); + } + } + } else { + NamenodeHeartbeatService heartbeatService = + createNamenodeHeartbeatService(nsId, nnId); + if (heartbeatService != null) { + ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); + } } } } @@ -586,6 +616,16 @@ protected NamenodeHeartbeatService createNamenodeHeartbeatService( return ret; } + protected NamenodeHeartbeatService createNamenodeHeartbeatService( + String nsId, String nnId, String resolvedHost) { + + LOG.info("Creating heartbeat service for" + + " Namenode {}, resolved host {}, in {}", nnId, resolvedHost, nsId); + NamenodeHeartbeatService ret = new NamenodeHeartbeatService( + namenodeResolver, nsId, nnId, resolvedHost); + return ret; + } + ///////////////////////////////////////////////////////// // Router State Management ///////////////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 2c397d2d76..a1d5fe3258 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -462,6 +462,26 @@ + + dfs.federation.router.monitor.namenode.nameservice.resolution-enabled + false + + Determines if the given monitored namenode address is a domain name which needs to + be resolved. + This is used by router to resolve namenodes. + + + + + dfs.federation.router.monitor.namenode.nameservice.resolver.impl + + + Nameservice resolver implementation used by router. + Effective with + dfs.federation.router.monitor.namenode.nameservices.resolution-enabled on. + + + dfs.federation.router.monitor.localnamenode.enable true diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java index 38419ed849..9bf149c649 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.federation.router; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; @@ -28,15 +31,19 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.net.MockDomainNameResolver; import org.apache.hadoop.service.Service.STATE; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -203,4 +210,64 @@ public void testHearbeat() throws InterruptedException, IOException { standby = normalNss.get(1); assertEquals(NAMENODES[1], standby.getNamenodeId()); } + + @Test + public void testNamenodeHeartbeatServiceNNResolution() { + String nsId = "test-ns"; + String nnId = "nn"; + int rpcPort = 1000; + int servicePort = 1001; + int lifelinePort = 1002; + int webAddressPort = 1003; + Configuration conf = generateNamenodeConfiguration(nsId, nnId, + rpcPort, servicePort, lifelinePort, webAddressPort); + + Router testRouter = new Router(); + testRouter.setConf(conf); + + Collection heartbeatServices = + testRouter.createNamenodeHeartbeatServices(); + + assertEquals(2, heartbeatServices.size()); + + Iterator iterator = heartbeatServices.iterator(); + NamenodeHeartbeatService service = iterator.next(); + service.init(conf); + assertEquals("test-ns-nn-host01.test:host01.test:1001", + service.getNamenodeDesc()); + + service = iterator.next(); + service.init(conf); + assertEquals("test-ns-nn-host02.test:host02.test:1001", + service.getNamenodeDesc()); + + } + + private Configuration generateNamenodeConfiguration( + String nsId, String nnId, + int rpcPort, int servicePort, + int lifelinePort, int webAddressPort) { + Configuration conf = new HdfsConfiguration(); + String suffix = nsId + "." + nnId; + + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, false); + conf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, nsId + "." + nnId); + + conf.setBoolean( + RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE_RESOLUTION_ENABLED + "." + nsId, true); + conf.set( + RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE_RESOLVER_IMPL + "." + nsId, + MockDomainNameResolver.class.getName()); + + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix, + MockDomainNameResolver.DOMAIN + ":" + rpcPort); + conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + "." + suffix, + MockDomainNameResolver.DOMAIN + ":" + servicePort); + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY + "." + suffix, + MockDomainNameResolver.DOMAIN + ":" + lifelinePort); + conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix, + MockDomainNameResolver.DOMAIN + ":" + webAddressPort); + + return conf; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java index 94aff53470..78a1bc08f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/NNHAServiceTarget.java @@ -53,17 +53,69 @@ public class NNHAServiceTarget extends HAServiceTarget { private InetSocketAddress zkfcAddr; private NodeFencer fencer; private BadFencingConfigurationException fenceConfigError; - private final String nnId; - private final String nsId; - private final boolean autoFailoverEnabled; - + private HdfsConfiguration targetConf; + private String nnId; + private String nsId; + private boolean autoFailoverEnabled; + + /** + * Create a NNHAServiceTarget for a namenode. + * Look up addresses from configuration. + * + * @param conf HDFS configuration. + * @param nsId nsId of this nn. + * @param nnId nnId of this nn. + */ public NNHAServiceTarget(Configuration conf, String nsId, String nnId) { - Preconditions.checkNotNull(nnId); + initializeNnConfig(conf, nsId, nnId); - if (nsId == null) { - nsId = DFSUtil.getOnlyNameServiceIdOrNull(conf); - if (nsId == null) { + String serviceAddr = + DFSUtil.getNamenodeServiceAddr(targetConf, nsId, nnId); + if (serviceAddr == null) { + throw new IllegalArgumentException( + "Unable to determine service address for namenode '" + nnId + "'"); + } + + this.addr = NetUtils.createSocketAddr(serviceAddr, + HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); + + String lifelineAddrStr = + DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId); + this.lifelineAddr = (lifelineAddrStr != null) ? + NetUtils.createSocketAddr(lifelineAddrStr) : null; + + initializeFailoverConfig(); + } + + /** + * Create a NNHAServiceTarget for a namenode. + * Addresses are provided so we don't need to lookup the config. + * + * @param conf HDFS configuration. + * @param nsId nsId of this nn. + * @param nnId nnId of this nn. + * @param addr Provided service address. + * @param lifelineAddr Provided lifeline address. + */ + public NNHAServiceTarget(Configuration conf, + String nsId, String nnId, + String addr, String lifelineAddr) { + initializeNnConfig(conf, nsId, nnId); + + this.addr = NetUtils.createSocketAddr(addr); + this.lifelineAddr = NetUtils.createSocketAddr(lifelineAddr); + + initializeFailoverConfig(); + } + + private void initializeNnConfig(Configuration conf, + String providedNsId, String providedNnId) { + Preconditions.checkNotNull(providedNnId); + + if (providedNsId == null) { + providedNsId = DFSUtil.getOnlyNameServiceIdOrNull(conf); + if (providedNsId == null) { String errorString = "Unable to determine the name service ID."; String[] dfsNames = conf.getStrings(DFS_NAMESERVICES); if ((dfsNames != null) && (dfsNames.length > 1)) { @@ -75,27 +127,17 @@ public NNHAServiceTarget(Configuration conf, throw new IllegalArgumentException(errorString); } } - assert nsId != null; - + // Make a copy of the conf, and override configs based on the // target node -- not the node we happen to be running on. - HdfsConfiguration targetConf = new HdfsConfiguration(conf); - NameNode.initializeGenericKeys(targetConf, nsId, nnId); - - String serviceAddr = - DFSUtil.getNamenodeServiceAddr(targetConf, nsId, nnId); - if (serviceAddr == null) { - throw new IllegalArgumentException( - "Unable to determine service address for namenode '" + nnId + "'"); - } - this.addr = NetUtils.createSocketAddr(serviceAddr, - HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT); + this.targetConf = new HdfsConfiguration(conf); + NameNode.initializeGenericKeys(targetConf, providedNsId, providedNnId); - String lifelineAddrStr = - DFSUtil.getNamenodeLifelineAddr(targetConf, nsId, nnId); - this.lifelineAddr = (lifelineAddrStr != null) ? - NetUtils.createSocketAddr(lifelineAddrStr) : null; + this.nsId = providedNsId; + this.nnId = providedNnId; + } + private void initializeFailoverConfig() { this.autoFailoverEnabled = targetConf.getBoolean( DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT); @@ -105,16 +147,13 @@ public NNHAServiceTarget(Configuration conf, setZkfcPort(port); } } - + try { this.fencer = NodeFencer.create(targetConf, DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY); } catch (BadFencingConfigurationException e) { this.fenceConfigError = e; } - - this.nnId = nnId; - this.nsId = nsId; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java index ab7e0afbca..b1c2cd83c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestNNHealthCheck.java @@ -21,6 +21,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -79,6 +81,19 @@ public void testNNHealthCheckWithLifelineAddress() throws IOException { doNNHealthCheckTest(); } + @Test + public void testNNHAServiceTargetWithProvidedAddr() { + conf.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.1:1"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, "0.0.0.1:2"); + + // Test constructor with provided address. + NNHAServiceTarget target = new NNHAServiceTarget(conf, "ns", "nn1", + "0.0.0.0:1", "0.0.0.0:2"); + + assertEquals("/0.0.0.0:1", target.getAddress().toString()); + assertEquals("/0.0.0.0:2", target.getHealthMonitorAddress().toString()); + } + @Test public void testNNHealthCheckWithSafemodeAsUnhealthy() throws Exception { conf.setBoolean(DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE, true);