HDFS-13955. RBF: Support secure Namenode in NamenodeHeartbeatService. Contributed by CR Hota.
This commit is contained in:
parent
b062dd462d
commit
ddbe08db33
@ -31,6 +31,8 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||||
|
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.codehaus.jettison.json.JSONArray;
|
import org.codehaus.jettison.json.JSONArray;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
@ -55,9 +57,12 @@ private FederationUtil() {
|
|||||||
*
|
*
|
||||||
* @param beanQuery JMX bean.
|
* @param beanQuery JMX bean.
|
||||||
* @param webAddress Web address of the JMX endpoint.
|
* @param webAddress Web address of the JMX endpoint.
|
||||||
|
* @param connectionFactory to open http/https connection.
|
||||||
|
* @param scheme to use for URL connection.
|
||||||
* @return JSON with the JMX data
|
* @return JSON with the JMX data
|
||||||
*/
|
*/
|
||||||
public static JSONArray getJmx(String beanQuery, String webAddress) {
|
public static JSONArray getJmx(String beanQuery, String webAddress,
|
||||||
|
URLConnectionFactory connectionFactory, String scheme) {
|
||||||
JSONArray ret = null;
|
JSONArray ret = null;
|
||||||
BufferedReader reader = null;
|
BufferedReader reader = null;
|
||||||
try {
|
try {
|
||||||
@ -68,8 +73,11 @@ public static JSONArray getJmx(String beanQuery, String webAddress) {
|
|||||||
host = webAddressSplit[0];
|
host = webAddressSplit[0];
|
||||||
port = Integer.parseInt(webAddressSplit[1]);
|
port = Integer.parseInt(webAddressSplit[1]);
|
||||||
}
|
}
|
||||||
URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
|
URL jmxURL = new URL(scheme, host, port, "/jmx?qry=" + beanQuery);
|
||||||
URLConnection conn = jmxURL.openConnection();
|
LOG.debug("JMX URL: {}", jmxURL);
|
||||||
|
// Create a URL connection
|
||||||
|
URLConnection conn = connectionFactory.openConnection(
|
||||||
|
jmxURL, UserGroupInformation.isSecurityEnabled());
|
||||||
conn.setConnectTimeout(5 * 1000);
|
conn.setConnectTimeout(5 * 1000);
|
||||||
conn.setReadTimeout(5 * 1000);
|
conn.setReadTimeout(5 * 1000);
|
||||||
InputStream in = conn.getInputStream();
|
InputStream in = conn.getInputStream();
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
|
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
|
||||||
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
|
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
|
||||||
|
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||||
import org.codehaus.jettison.json.JSONArray;
|
import org.codehaus.jettison.json.JSONArray;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -86,7 +87,10 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
|||||||
private String lifelineAddress;
|
private String lifelineAddress;
|
||||||
/** HTTP address for the namenode. */
|
/** HTTP address for the namenode. */
|
||||||
private String webAddress;
|
private String webAddress;
|
||||||
|
/** Connection factory for JMX calls. */
|
||||||
|
private URLConnectionFactory connectionFactory;
|
||||||
|
/** URL scheme to use for JMX calls. */
|
||||||
|
private String scheme;
|
||||||
/**
|
/**
|
||||||
* Create a new Namenode status updater.
|
* Create a new Namenode status updater.
|
||||||
* @param resolver Namenode resolver service to handle NN registration.
|
* @param resolver Namenode resolver service to handle NN registration.
|
||||||
@ -147,6 +151,12 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
|
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
|
||||||
LOG.info("{} Web address: {}", nnDesc, webAddress);
|
LOG.info("{} Web address: {}", nnDesc, webAddress);
|
||||||
|
|
||||||
|
this.connectionFactory =
|
||||||
|
URLConnectionFactory.newDefaultURLConnectionFactory(conf);
|
||||||
|
|
||||||
|
this.scheme =
|
||||||
|
DFSUtil.getHttpPolicy(conf).isHttpEnabled() ? "http" : "https";
|
||||||
|
|
||||||
this.setIntervalMs(conf.getLong(
|
this.setIntervalMs(conf.getLong(
|
||||||
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
|
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
|
||||||
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
|
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
|
||||||
@ -329,7 +339,8 @@ private void updateJMXParameters(
|
|||||||
try {
|
try {
|
||||||
// TODO part of this should be moved to its own utility
|
// TODO part of this should be moved to its own utility
|
||||||
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
|
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
|
||||||
JSONArray aux = FederationUtil.getJmx(query, address);
|
JSONArray aux = FederationUtil.getJmx(
|
||||||
|
query, address, connectionFactory, scheme);
|
||||||
if (aux != null) {
|
if (aux != null) {
|
||||||
for (int i = 0; i < aux.length(); i++) {
|
for (int i = 0; i < aux.length(); i++) {
|
||||||
JSONObject jsonObject = aux.getJSONObject(i);
|
JSONObject jsonObject = aux.getJSONObject(i);
|
||||||
@ -364,4 +375,14 @@ private void updateJMXParameters(
|
|||||||
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
|
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
LOG.info("Stopping NamenodeHeartbeat service for, NS {} NN {} ",
|
||||||
|
this.nameserviceId, this.namenodeId);
|
||||||
|
if (this.connectionFactory != null) {
|
||||||
|
this.connectionFactory.destroy();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
}
|
}
|
@ -20,6 +20,7 @@
|
|||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -32,6 +33,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.LogVerificationAppender;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
@ -40,8 +42,10 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||||
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -264,4 +268,50 @@ private static void assertNamenodeHeartbeatService(
|
|||||||
assertTrue(actualSet + " does not contain all " + expected,
|
assertTrue(actualSet + " does not contain all " + expected,
|
||||||
actualSet.containsAll(expected));
|
actualSet.containsAll(expected));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJmxUrlHTTP() {
|
||||||
|
verifyUrlSchemes(HttpConfig.Policy.HTTP_ONLY.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJmxUrlHTTPs() {
|
||||||
|
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyUrlSchemes(String scheme) {
|
||||||
|
|
||||||
|
// Attach our own log appender so we can verify output
|
||||||
|
final LogVerificationAppender appender =
|
||||||
|
new LogVerificationAppender();
|
||||||
|
final org.apache.log4j.Logger logger =
|
||||||
|
org.apache.log4j.Logger.getRootLogger();
|
||||||
|
logger.addAppender(appender);
|
||||||
|
logger.setLevel(Level.DEBUG);
|
||||||
|
|
||||||
|
// Setup and start the Router
|
||||||
|
Configuration conf = getNamenodesConfig();
|
||||||
|
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
|
||||||
|
Configuration routerConf = new RouterConfigBuilder(conf)
|
||||||
|
.heartbeat(true)
|
||||||
|
.build();
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
|
||||||
|
routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, "ns1.nn0");
|
||||||
|
router = new Router();
|
||||||
|
router.init(routerConf);
|
||||||
|
|
||||||
|
// Test the heartbeat services of the Router
|
||||||
|
Collection<NamenodeHeartbeatService> heartbeatServices =
|
||||||
|
router.getNamenodeHeartbeatServices();
|
||||||
|
for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
|
||||||
|
heartbeatService.getNamenodeStatusReport();
|
||||||
|
}
|
||||||
|
if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) {
|
||||||
|
assertEquals(1, appender.countLinesWithMessage("JMX URL: https://"));
|
||||||
|
assertEquals(0, appender.countLinesWithMessage("JMX URL: http://"));
|
||||||
|
} else {
|
||||||
|
assertEquals(1, appender.countLinesWithMessage("JMX URL: http://"));
|
||||||
|
assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user