HDFS-17026. RBF: NamenodeHeartbeatService should update JMX report with configurable frequency. (#5691). Contributed by hchaverri.
Signed-off-by: Inigo Goiri <inigoiri@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
86c250a54a
commit
124313d215
@ -21,6 +21,8 @@
|
|||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEALTH_MONITOR_TIMEOUT_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
@ -47,6 +49,7 @@
|
|||||||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.codehaus.jettison.json.JSONArray;
|
import org.codehaus.jettison.json.JSONArray;
|
||||||
import org.codehaus.jettison.json.JSONException;
|
import org.codehaus.jettison.json.JSONException;
|
||||||
import org.codehaus.jettison.json.JSONObject;
|
import org.codehaus.jettison.json.JSONObject;
|
||||||
@ -107,6 +110,15 @@ public class NamenodeHeartbeatService extends PeriodicService {
|
|||||||
/** URL scheme to use for JMX calls. */
|
/** URL scheme to use for JMX calls. */
|
||||||
private String scheme;
|
private String scheme;
|
||||||
|
|
||||||
|
/** Frequency of updates to JMX report. */
|
||||||
|
private long updateJmxIntervalMs;
|
||||||
|
/** Timestamp of last attempt to update JMX report. */
|
||||||
|
private long lastJmxUpdateAttempt;
|
||||||
|
/** Result of the last successful FsNamesystemMetrics report. */
|
||||||
|
private JSONArray fsNamesystemMetrics;
|
||||||
|
/** Result of the last successful NamenodeInfoMetrics report. */
|
||||||
|
private JSONArray namenodeInfoMetrics;
|
||||||
|
|
||||||
private String resolvedHost;
|
private String resolvedHost;
|
||||||
private String originalNnId;
|
private String originalNnId;
|
||||||
|
|
||||||
@ -233,6 +245,9 @@ protected void serviceInit(Configuration configuration) throws Exception {
|
|||||||
this.healthMonitorTimeoutMs = (int) timeoutMs;
|
this.healthMonitorTimeoutMs = (int) timeoutMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.updateJmxIntervalMs = conf.getTimeDuration(DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
|
||||||
|
DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
super.serviceInit(configuration);
|
super.serviceInit(configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -447,8 +462,13 @@ private void updateJMXParameters(
|
|||||||
String address, NamenodeStatusReport report) {
|
String address, NamenodeStatusReport report) {
|
||||||
try {
|
try {
|
||||||
// TODO part of this should be moved to its own utility
|
// TODO part of this should be moved to its own utility
|
||||||
getFsNamesystemMetrics(address, report);
|
if (shouldUpdateJmx()) {
|
||||||
getNamenodeInfoMetrics(address, report);
|
this.lastJmxUpdateAttempt = Time.monotonicNow();
|
||||||
|
getFsNamesystemMetrics(address);
|
||||||
|
getNamenodeInfoMetrics(address);
|
||||||
|
}
|
||||||
|
populateFsNamesystemMetrics(this.fsNamesystemMetrics, report);
|
||||||
|
populateNamenodeInfoMetrics(this.namenodeInfoMetrics, report);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
|
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
|
||||||
}
|
}
|
||||||
@ -482,17 +502,38 @@ private void updateHAStatusParameters(NamenodeStatusReport report) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evaluates whether the JMX report should be refreshed by
|
||||||
|
* calling the Namenode, based on the following conditions:
|
||||||
|
* 1. JMX Updates must be enabled.
|
||||||
|
* 2. The last attempt to update JMX occurred before the
|
||||||
|
* configured interval (if any).
|
||||||
|
*/
|
||||||
|
private boolean shouldUpdateJmx() {
|
||||||
|
if (this.updateJmxIntervalMs < 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Time.monotonicNow() - this.lastJmxUpdateAttempt > this.updateJmxIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches NamenodeInfo metrics from namenode.
|
* Fetches NamenodeInfo metrics from namenode.
|
||||||
* @param address Web interface of the Namenode to monitor.
|
* @param address Web interface of the Namenode to monitor.
|
||||||
* @param report Namenode status report to update with JMX data.
|
|
||||||
* @throws JSONException
|
|
||||||
*/
|
*/
|
||||||
private void getNamenodeInfoMetrics(String address,
|
private void getNamenodeInfoMetrics(String address) {
|
||||||
NamenodeStatusReport report) throws JSONException {
|
|
||||||
String query = "Hadoop:service=NameNode,name=NameNodeInfo";
|
String query = "Hadoop:service=NameNode,name=NameNodeInfo";
|
||||||
JSONArray aux =
|
this.namenodeInfoMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
|
||||||
FederationUtil.getJmx(query, address, connectionFactory, scheme);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populates NamenodeInfo metrics into report.
|
||||||
|
* @param aux NamenodeInfo metrics from namenode.
|
||||||
|
* @param report Namenode status report to update with JMX data.
|
||||||
|
* @throws JSONException When an invalid JSONObject is found
|
||||||
|
*/
|
||||||
|
private void populateNamenodeInfoMetrics(JSONArray aux, NamenodeStatusReport report)
|
||||||
|
throws JSONException {
|
||||||
if (aux != null && aux.length() > 0) {
|
if (aux != null && aux.length() > 0) {
|
||||||
JSONObject jsonObject = aux.getJSONObject(0);
|
JSONObject jsonObject = aux.getJSONObject(0);
|
||||||
String name = jsonObject.getString("name");
|
String name = jsonObject.getString("name");
|
||||||
@ -510,14 +551,20 @@ private void getNamenodeInfoMetrics(String address,
|
|||||||
/**
|
/**
|
||||||
* Fetches FSNamesystem* metrics from namenode.
|
* Fetches FSNamesystem* metrics from namenode.
|
||||||
* @param address Web interface of the Namenode to monitor.
|
* @param address Web interface of the Namenode to monitor.
|
||||||
* @param report Namenode status report to update with JMX data.
|
|
||||||
* @throws JSONException
|
|
||||||
*/
|
*/
|
||||||
private void getFsNamesystemMetrics(String address,
|
private void getFsNamesystemMetrics(String address) {
|
||||||
NamenodeStatusReport report) throws JSONException {
|
|
||||||
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
|
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
|
||||||
JSONArray aux = FederationUtil.getJmx(
|
this.fsNamesystemMetrics = FederationUtil.getJmx(query, address, connectionFactory, scheme);
|
||||||
query, address, connectionFactory, scheme);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populates FSNamesystem* metrics into report.
|
||||||
|
* @param aux FSNamesystem* metrics from namenode.
|
||||||
|
* @param report Namenode status report to update with JMX data.
|
||||||
|
* @throws JSONException When invalid JSONObject is found.
|
||||||
|
*/
|
||||||
|
private void populateFsNamesystemMetrics(JSONArray aux, NamenodeStatusReport report)
|
||||||
|
throws JSONException {
|
||||||
if (aux != null) {
|
if (aux != null) {
|
||||||
for (int i = 0; i < aux.length(); i++) {
|
for (int i = 0; i < aux.length(); i++) {
|
||||||
JSONObject jsonObject = aux.getJSONObject(i);
|
JSONObject jsonObject = aux.getJSONObject(i);
|
||||||
|
@ -115,6 +115,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
|
|||||||
FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
|
FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval";
|
||||||
public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
|
public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT =
|
||||||
TimeUnit.SECONDS.toMillis(5);
|
TimeUnit.SECONDS.toMillis(5);
|
||||||
|
public static final String DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS =
|
||||||
|
FEDERATION_ROUTER_PREFIX + "namenode.heartbeat.jmx.interval";
|
||||||
|
public static final long DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS_DEFAULT = 0;
|
||||||
|
|
||||||
// HDFS Router NN client
|
// HDFS Router NN client
|
||||||
public static final String
|
public static final String
|
||||||
|
@ -471,6 +471,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.federation.router.namenode.heartbeat.jmx.interval</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>
|
||||||
|
How often the Router should request JMX reports from the Namenode in miliseconds.
|
||||||
|
If this value is 0, it will request JMX reports every time a Namenode report is requested.
|
||||||
|
If this value is negative, it will disable JMX reports from the Namenode.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.federation.router.store.router.expiration</name>
|
<name>dfs.federation.router.store.router.expiration</name>
|
||||||
<value>5m</value>
|
<value>5m</value>
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
@ -293,7 +294,32 @@ public void testJmxUrlHTTPs() {
|
|||||||
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
|
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJmxRequestFrequency() {
|
||||||
|
// Disable JMX requests
|
||||||
|
Configuration conf = getNamenodesConfig();
|
||||||
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS, -1);
|
||||||
|
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 0, 1);
|
||||||
|
|
||||||
|
// Set JMX requests to lower frequency
|
||||||
|
conf = getNamenodesConfig();
|
||||||
|
conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_HEARTBEAT_JMX_INTERVAL_MS,
|
||||||
|
TimeUnit.MINUTES.toMillis(5));
|
||||||
|
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 1, 2);
|
||||||
|
|
||||||
|
// Set JMX requests to default frequency
|
||||||
|
conf = getNamenodesConfig();
|
||||||
|
verifyUrlSchemes(HttpConfig.Policy.HTTPS_ONLY.name(), conf, 0, 2, 2);
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyUrlSchemes(String scheme) {
|
private void verifyUrlSchemes(String scheme) {
|
||||||
|
int httpRequests = HttpConfig.Policy.HTTP_ONLY.name().equals(scheme) ? 1 : 0;
|
||||||
|
int httpsRequests = HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme) ? 1 : 0;
|
||||||
|
verifyUrlSchemes(scheme, getNamenodesConfig(), httpRequests, httpsRequests, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyUrlSchemes(String scheme, Configuration conf, int httpRequests,
|
||||||
|
int httpsRequests, int requestsPerService) {
|
||||||
|
|
||||||
// Attach our own log appender so we can verify output
|
// Attach our own log appender so we can verify output
|
||||||
final LogVerificationAppender appender =
|
final LogVerificationAppender appender =
|
||||||
@ -304,7 +330,6 @@ private void verifyUrlSchemes(String scheme) {
|
|||||||
GenericTestUtils.setRootLogLevel(Level.DEBUG);
|
GenericTestUtils.setRootLogLevel(Level.DEBUG);
|
||||||
|
|
||||||
// Setup and start the Router
|
// Setup and start the Router
|
||||||
Configuration conf = getNamenodesConfig();
|
|
||||||
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
|
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, scheme);
|
||||||
Configuration routerConf = new RouterConfigBuilder(conf)
|
Configuration routerConf = new RouterConfigBuilder(conf)
|
||||||
.heartbeat(true)
|
.heartbeat(true)
|
||||||
@ -318,15 +343,12 @@ private void verifyUrlSchemes(String scheme) {
|
|||||||
Collection<NamenodeHeartbeatService> heartbeatServices =
|
Collection<NamenodeHeartbeatService> heartbeatServices =
|
||||||
router.getNamenodeHeartbeatServices();
|
router.getNamenodeHeartbeatServices();
|
||||||
for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
|
for (NamenodeHeartbeatService heartbeatService : heartbeatServices) {
|
||||||
heartbeatService.getNamenodeStatusReport();
|
for (int request = 0; request < requestsPerService; request++) {
|
||||||
}
|
heartbeatService.getNamenodeStatusReport();
|
||||||
if (HttpConfig.Policy.HTTPS_ONLY.name().equals(scheme)) {
|
}
|
||||||
assertEquals(2, appender.countLinesWithMessage("JMX URL: https://"));
|
|
||||||
assertEquals(0, appender.countLinesWithMessage("JMX URL: http://"));
|
|
||||||
} else {
|
|
||||||
assertEquals(2, appender.countLinesWithMessage("JMX URL: http://"));
|
|
||||||
assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
|
|
||||||
}
|
}
|
||||||
|
assertEquals(httpsRequests * 2, appender.countLinesWithMessage("JMX URL: https://"));
|
||||||
|
assertEquals(httpRequests * 2, appender.countLinesWithMessage("JMX URL: http://"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
Reference in New Issue
Block a user