diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index f0d9168a36..45868a8222 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.VersionInfo; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -55,9 +57,12 @@ private FederationUtil() { * * @param beanQuery JMX bean. * @param webAddress Web address of the JMX endpoint. + * @param connectionFactory to open http/https connection. + * @param scheme to use for URL connection. * @return JSON with the JMX data */ - public static JSONArray getJmx(String beanQuery, String webAddress) { + public static JSONArray getJmx(String beanQuery, String webAddress, + URLConnectionFactory connectionFactory, String scheme) { JSONArray ret = null; BufferedReader reader = null; try { @@ -68,8 +73,11 @@ public static JSONArray getJmx(String beanQuery, String webAddress) { host = webAddressSplit[0]; port = Integer.parseInt(webAddressSplit[1]); } - URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery); - URLConnection conn = jmxURL.openConnection(); + URL jmxURL = new URL(scheme, host, port, "/jmx?qry=" + beanQuery); + LOG.debug("JMX URL: {}", jmxURL); + // Create a URL connection + URLConnection conn = connectionFactory.openConnection( + jmxURL, UserGroupInformation.isSecurityEnabled()); conn.setConnectTimeout(5 * 1000); conn.setReadTimeout(5 * 1000); InputStream in = conn.getInputStream(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java index 82b5609a86..d50a5fcdf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.tools.DFSHAAdmin; import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; +import org.apache.hadoop.hdfs.web.URLConnectionFactory; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -86,7 +87,10 @@ public class NamenodeHeartbeatService extends PeriodicService { private String lifelineAddress; /** HTTP address for the namenode. */ private String webAddress; - + /** Connection factory for JMX calls. */ + private URLConnectionFactory connectionFactory; + /** URL scheme to use for JMX calls. */ + private String scheme; /** * Create a new Namenode status updater. * @param resolver Namenode resolver service to handle NN registration. @@ -147,6 +151,12 @@ protected void serviceInit(Configuration configuration) throws Exception { DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); LOG.info("{} Web address: {}", nnDesc, webAddress); + this.connectionFactory = + URLConnectionFactory.newDefaultURLConnectionFactory(conf); + + this.scheme = + DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https"; + this.setIntervalMs(conf.getLong( DFS_ROUTER_HEARTBEAT_INTERVAL_MS, DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT)); @@ -329,7 +339,8 @@ private void updateJMXParameters( try { // TODO part of this should be moved to its own utility String query = "Hadoop:service=NameNode,name=FSNamesystem*"; - JSONArray aux = FederationUtil.getJmx(query, address); + JSONArray aux = FederationUtil.getJmx( + query, address, connectionFactory, scheme); if (aux != null) { for (int i = 0; i < aux.length(); i++) { JSONObject jsonObject = aux.getJSONObject(i); @@ -364,4 +375,14 @@ private void updateJMXParameters( LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e); } } -} + + @Override + protected void serviceStop() throws Exception { + LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ", + this.nameserviceId, this.namenodeId); + if (this.connectionFactory != null) { + this.connectionFactory.destroy(); + } + super.serviceStop(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 8fa3506f73..9fcfcb4ae3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -20,6 +20,7 @@ import static java.util.Arrays.asList; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.Collection; @@ -32,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.hdfs.server.federation.MockNamenode; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; @@ -40,8 +42,10 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -264,4 +268,50 @@ private static void assertNamenodeHeartbeatService( assertTrue(actualSet + " does not contain all " + expected, actualSet.containsAll(expected)); } + + @Test + public void testJmxUrlHTTP() { + verifyUrlSchemes(HttpConfig.Policy.HTTP_ONLY.name()); + } + + @Test + public void testJmxUrlHTTPs() { + verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name()); + } + + private void verifyUrlSchemes(String scheme) { + + // Attach our own log appender so we can verify output + final LogVerificationAppender appender = + new LogVerificationAppender(); + final org.apache.log4j.Logger logger = + org.apache.log4j.Logger.getRootLogger(); + logger.addAppender(appender); + logger.setLevel(Level.DEBUG); + + // Setup and start the Router + Configuration conf = getNamenodesConfig(); + conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme); + Configuration routerConf = new RouterConfigBuilder(conf) + .heartbeat(true) + .build(); + routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns1.nn0"); + router = new Router(); + router.init(routerConf); + + // Test the heartbeat services of the Router + Collection heartbeatServices = + router.getNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService heartbeatService : heartbeatServices) { + heartbeatService.getNamenodeStatusReport(); + } + if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) { + assertEquals(1, appender.countLinesWithMessage("JMX URL: https://")); + assertEquals(0, appender.countLinesWithMessage("JMX URL: http://")); + } else { + assertEquals(1, appender.countLinesWithMessage("JMX URL: http://")); + assertEquals(0, appender.countLinesWithMessage("JMX URL: https://")); + } + } }