HDFS-11826. Federation Namenode Heartbeat. Contributed by Inigo Goiri.

(cherry picked from commit 928f8dab52191e733984d37f47b69719ccf11313)
This commit is contained in:
Inigo Goiri 2017-08-01 14:40:27 -07:00
parent 55da7fd7eb
commit d8c8107332
13 changed files with 1057 additions and 1 deletions

View File

@ -1150,6 +1150,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
// HDFS Router heartbeat
public static final String DFS_ROUTER_HEARTBEAT_ENABLE =
FEDERATION_ROUTER_PREFIX + "heartbeat.enable";
public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS =
FEDERATION_ROUTER_PREFIX + "heartbeat.interval";
public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(5);
public static final String DFS_ROUTER_MONITOR_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.namenode";
public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE =
FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable";
public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true;
// HDFS Router NN client
public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
FEDERATION_ROUTER_PREFIX + "connection.pool-size";

View File

@ -1323,6 +1323,44 @@ public static String getNamenodeServiceAddr(final Configuration conf,
return serviceRpcAddr;
}
/**
* Map a logical namenode ID to its web address. Use the given nameservice if
* specified, or the configured one if none is given.
*
* @param conf Configuration
* @param nsId which nameservice nnId is a part of, optional
* @param nnId the namenode ID to get the service addr for
* @return the service addr, null if it could not be determined
*/
public static String getNamenodeWebAddr(final Configuration conf, String nsId,
String nnId) {
if (nsId == null) {
nsId = getOnlyNameServiceIdOrNull(conf);
}
String webAddrKey = DFSUtilClient.concatSuffixes(
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nsId, nnId);
String webAddr =
conf.get(webAddrKey, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT);
return webAddr;
}
/**
* Get all of the Web addresses of the individual NNs in a given nameservice.
*
* @param conf Configuration
* @param nsId the nameservice whose NNs addresses we want.
* @param defaultValue default address to return in case key is not found.
* @return A map from nnId -> Web address of each NN in the nameservice.
*/
public static Map<String, InetSocketAddress> getWebAddressesForNameserviceId(
Configuration conf, String nsId, String defaultValue) {
return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
}
/**
* If the configuration refers to only a single nameservice, return the
* name of that nameservice. If it refers to 0 or more than 1, return null.

View File

@ -39,8 +39,29 @@ public class NamenodeStatusReport {
private HAServiceState status = HAServiceState.STANDBY;
private boolean safeMode = false;
/** Datanodes stats. */
private int liveDatanodes = -1;
private int deadDatanodes = -1;
/** Decommissioning datanodes. */
private int decomDatanodes = -1;
/** Live decommissioned datanodes. */
private int liveDecomDatanodes = -1;
/** Dead decommissioned datanodes. */
private int deadDecomDatanodes = -1;
/** Space stats. */
private long availableSpace = -1;
private long numOfFiles = -1;
private long numOfBlocks = -1;
private long numOfBlocksMissing = -1;
private long numOfBlocksPendingReplication = -1;
private long numOfBlocksUnderReplicated = -1;
private long numOfBlocksPendingDeletion = -1;
private long totalSpace = -1;
/** If the fields are valid. */
private boolean registrationValid = false;
private boolean statsValid = false;
private boolean haStateValid = false;
public NamenodeStatusReport(String ns, String nn, String rpc, String service,
@ -53,6 +74,15 @@ public NamenodeStatusReport(String ns, String nn, String rpc, String service,
this.webAddress = web;
}
/**
* If the statistics are valid.
*
* @return If the statistics are valid.
*/
public boolean statsValid() {
return this.statsValid;
}
/**
* If the registration is valid.
*
@ -187,6 +217,169 @@ public boolean getSafemode() {
return this.safeMode;
}
/**
* Set the datanode information.
*
* @param numLive Number of live nodes.
* @param numDead Number of dead nodes.
* @param numDecom Number of decommissioning nodes.
* @param numLiveDecom Number of decommissioned live nodes.
* @param numDeadDecom Number of decommissioned dead nodes.
*/
public void setDatanodeInfo(int numLive, int numDead, int numDecom,
int numLiveDecom, int numDeadDecom) {
this.liveDatanodes = numLive;
this.deadDatanodes = numDead;
this.decomDatanodes = numDecom;
this.liveDecomDatanodes = numLiveDecom;
this.deadDecomDatanodes = numDeadDecom;
this.statsValid = true;
}
/**
* Get the number of live blocks.
*
* @return The number of dead nodes.
*/
public int getNumLiveDatanodes() {
return this.liveDatanodes;
}
/**
* Get the number of dead blocks.
*
* @return The number of dead nodes.
*/
public int getNumDeadDatanodes() {
return this.deadDatanodes;
}
/**
* Get the number of decommissionining nodes.
*
* @return The number of decommissionining nodes.
*/
public int getNumDecommissioningDatanodes() {
return this.decomDatanodes;
}
/**
* Get the number of live decommissioned nodes.
*
* @return The number of live decommissioned nodes.
*/
public int getNumDecomLiveDatanodes() {
return this.liveDecomDatanodes;
}
/**
* Get the number of dead decommissioned nodes.
*
* @return The number of dead decommissioned nodes.
*/
public int getNumDecomDeadDatanodes() {
return this.deadDecomDatanodes;
}
/**
* Set the filesystem information.
*
* @param available Available capacity.
* @param total Total capacity.
* @param numFiles Number of files.
* @param numBlocks Total number of blocks.
* @param numBlocksMissing Number of missing blocks.
* @param numOfBlocksPendingReplication Number of blocks pending replication.
* @param numOfBlocksUnderReplicated Number of blocks under replication.
* @param numOfBlocksPendingDeletion Number of blocks pending deletion.
*/
public void setNamesystemInfo(long available, long total,
long numFiles, long numBlocks, long numBlocksMissing,
long numBlocksPendingReplication, long numBlocksUnderReplicated,
long numBlocksPendingDeletion) {
this.totalSpace = total;
this.availableSpace = available;
this.numOfBlocks = numBlocks;
this.numOfBlocksMissing = numBlocksMissing;
this.numOfBlocksPendingReplication = numBlocksPendingReplication;
this.numOfBlocksUnderReplicated = numBlocksUnderReplicated;
this.numOfBlocksPendingDeletion = numBlocksPendingDeletion;
this.numOfFiles = numFiles;
this.statsValid = true;
}
/**
* Get the number of blocks.
*
* @return The number of blocks.
*/
public long getNumBlocks() {
return this.numOfBlocks;
}
/**
* Get the number of files.
*
* @return The number of files.
*/
public long getNumFiles() {
return this.numOfFiles;
}
/**
* Get the total space.
*
* @return The total space.
*/
public long getTotalSpace() {
return this.totalSpace;
}
/**
* Get the available space.
*
* @return The available space.
*/
public long getAvailableSpace() {
return this.availableSpace;
}
/**
* Get the number of missing blocks.
*
* @return Number of missing blocks.
*/
public long getNumBlocksMissing() {
return this.numOfBlocksMissing;
}
/**
* Get the number of pending replication blocks.
*
* @return Number of pending replication blocks.
*/
public long getNumOfBlocksPendingReplication() {
return this.numOfBlocksPendingReplication;
}
/**
* Get the number of under replicated blocks.
*
* @return Number of under replicated blocks.
*/
public long getNumOfBlocksUnderReplicated() {
return this.numOfBlocksUnderReplicated;
}
/**
* Get the number of pending deletion blocks.
*
* @return Number of pending deletion blocks.
*/
public long getNumOfBlocksPendingDeletion() {
return this.numOfBlocksPendingDeletion;
}
@Override
public String toString() {
return String.format("%s-%s:%s",

View File

@ -17,13 +17,22 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,6 +48,63 @@ private FederationUtil() {
// Utility Class
}
/**
* Get a JMX data from a web endpoint.
*
* @param beanQuery JMX bean.
* @param webAddress Web address of the JMX endpoint.
* @return JSON with the JMX data
*/
public static JSONArray getJmx(String beanQuery, String webAddress) {
JSONArray ret = null;
BufferedReader reader = null;
try {
String host = webAddress;
int port = -1;
if (webAddress.indexOf(":") > 0) {
String[] webAddressSplit = webAddress.split(":");
host = webAddressSplit[0];
port = Integer.parseInt(webAddressSplit[1]);
}
URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery);
URLConnection conn = jmxURL.openConnection();
conn.setConnectTimeout(5 * 1000);
conn.setReadTimeout(5 * 1000);
InputStream in = conn.getInputStream();
InputStreamReader isr = new InputStreamReader(in, "UTF-8");
reader = new BufferedReader(isr);
StringBuilder sb = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
sb.append(line);
}
String jmxOutput = sb.toString();
// Parse JSON
JSONObject json = new JSONObject(jmxOutput);
ret = json.getJSONArray("beans");
} catch (IOException e) {
LOG.error("Cannot read JMX bean {} from server {}: {}",
beanQuery, webAddress, e.getMessage());
} catch (JSONException e) {
LOG.error("Cannot parse JMX output for {} from server {}: {}",
beanQuery, webAddress, e.getMessage());
} catch (Exception e) {
LOG.error("Cannot parse JMX output for {} from server {}: {}",
beanQuery, webAddress, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
LOG.error("Problem closing {}", webAddress, e);
}
}
}
return ret;
}
/**
* Create an instance of an interface with a constructor using a context.
*

View File

@ -0,0 +1,350 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link Router} periodically checks the state of a Namenode (usually on
* the same server) and reports their high availability (HA) state and
* load/space status to the
* {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService}
* . Note that this is an optional role as a Router can be independent of any
* subcluster.
* <p>
* For performance with Namenode HA, the Router uses the high availability state
* information in the State Store to forward the request to the Namenode that is
* most likely to be active.
* <p>
* Note that this service can be embedded into the Namenode itself to simplify
* the operation.
*/
public class NamenodeHeartbeatService extends PeriodicService {
private static final Logger LOG =
LoggerFactory.getLogger(NamenodeHeartbeatService.class);
/** Configuration for the heartbeat. */
private Configuration conf;
/** Router performing the heartbeating. */
private final ActiveNamenodeResolver resolver;
/** Interface to the tracked NN. */
private final String nameserviceId;
private final String namenodeId;
/** Namenode HA target. */
private NNHAServiceTarget localTarget;
/** RPC address for the namenode. */
private String rpcAddress;
/** Service RPC address for the namenode. */
private String serviceAddress;
/** Service RPC address for the namenode. */
private String lifelineAddress;
/** HTTP address for the namenode. */
private String webAddress;
/**
* Create a new Namenode status updater.
* @param resolver Namenode resolver service to handle NN registration.
* @param nameserviceId Identifier of the nameservice.
* @param namenodeId Identifier of the namenode in HA.
*/
public NamenodeHeartbeatService(
ActiveNamenodeResolver resolver, String nsId, String nnId) {
super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " +
nnId);
this.resolver = resolver;
this.nameserviceId = nsId;
this.namenodeId = nnId;
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
this.conf = configuration;
if (this.namenodeId != null && !this.namenodeId.isEmpty()) {
this.localTarget = new NNHAServiceTarget(
conf, nameserviceId, namenodeId);
} else {
this.localTarget = null;
}
// Get the RPC address for the clients to connect
this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId);
LOG.info("{}-{} RPC address: {}",
nameserviceId, namenodeId, rpcAddress);
// Get the Service RPC address for monitoring
this.serviceAddress =
DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId);
if (this.serviceAddress == null) {
LOG.error("Cannot locate RPC service address for NN {}-{}, " +
"using RPC address {}", nameserviceId, namenodeId, this.rpcAddress);
this.serviceAddress = this.rpcAddress;
}
LOG.info("{}-{} Service RPC address: {}",
nameserviceId, namenodeId, serviceAddress);
// Get the Lifeline RPC address for faster monitoring
this.lifelineAddress =
DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId);
if (this.lifelineAddress == null) {
this.lifelineAddress = this.serviceAddress;
}
LOG.info("{}-{} Lifeline RPC address: {}",
nameserviceId, namenodeId, lifelineAddress);
// Get the Web address for UI
this.webAddress =
DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId);
LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress);
this.setIntervalMs(conf.getLong(
DFS_ROUTER_HEARTBEAT_INTERVAL_MS,
DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT));
super.serviceInit(configuration);
}
@Override
public void periodicInvoke() {
updateState();
}
/**
* Get the RPC address for a Namenode.
* @param conf Configuration.
* @param nsId Name service identifier.
* @param nnId Name node identifier.
* @return RPC address in format hostname:1234.
*/
private static String getRpcAddress(
Configuration conf, String nsId, String nnId) {
// Get it from the regular RPC setting
String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
String ret = conf.get(confKey);
if (nsId != null && nnId != null) {
// Get if for the proper nameservice and namenode
confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId);
ret = conf.get(confKey);
// If not available, get it from the map
if (ret == null) {
Map<String, InetSocketAddress> rpcAddresses =
DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null);
if (rpcAddresses.containsKey(nnId)) {
InetSocketAddress sockAddr = rpcAddresses.get(nnId);
InetAddress addr = sockAddr.getAddress();
ret = addr.getHostAddress() + ":" + sockAddr.getPort();
}
}
}
return ret;
}
/**
* Update the state of the Namenode.
*/
private void updateState() {
NamenodeStatusReport report = getNamenodeStatusReport();
if (!report.registrationValid()) {
// Not operational
LOG.error("Namenode is not operational: {}", getNamenodeDesc());
} else if (report.haStateValid()) {
// block and HA status available
LOG.debug("Received service state: {} from HA namenode: {}",
report.getState(), getNamenodeDesc());
} else if (localTarget == null) {
// block info available, HA status not expected
LOG.debug(
"Reporting non-HA namenode as operational: " + getNamenodeDesc());
} else {
// block info available, HA status should be available, but was not
// fetched do nothing and let the current state stand
return;
}
try {
if (!resolver.registerNamenode(report)) {
LOG.warn("Cannot register namenode {}", report);
}
} catch (IOException e) {
LOG.info("Cannot register namenode in the State Store");
} catch (Exception ex) {
LOG.error("Unhandled exception updating NN registration for {}",
getNamenodeDesc(), ex);
}
}
/**
* Get the status report for the Namenode monitored by this heartbeater.
* @return Namenode status report.
*/
protected NamenodeStatusReport getNamenodeStatusReport() {
NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId,
namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress);
try {
LOG.debug("Probing NN at service address: {}", serviceAddress);
URI serviceURI = new URI("hdfs://" + serviceAddress);
// Read the filesystem info from RPC (required)
NamenodeProtocol nn = NameNodeProxies
.createProxy(this.conf, serviceURI, NamenodeProtocol.class)
.getProxy();
if (nn != null) {
NamespaceInfo info = nn.versionRequest();
if (info != null) {
report.setNamespaceInfo(info);
}
}
if (!report.registrationValid()) {
return report;
}
// Check for safemode from the client protocol. Currently optional, but
// should be required at some point for QoS
try {
ClientProtocol client = NameNodeProxies
.createProxy(this.conf, serviceURI, ClientProtocol.class)
.getProxy();
if (client != null) {
boolean isSafeMode = client.setSafeMode(
SafeModeAction.SAFEMODE_GET, false);
report.setSafeMode(isSafeMode);
}
} catch (Exception e) {
LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e);
}
// Read the stats from JMX (optional)
updateJMXParameters(webAddress, report);
if (localTarget != null) {
// Try to get the HA status
try {
// Determine if NN is active
// TODO: dynamic timeout
HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000);
HAServiceStatus status = haProtocol.getServiceStatus();
report.setHAServiceState(status.getState());
} catch (Throwable e) {
// Failed to fetch HA status, ignoring failure
LOG.error("Cannot fetch HA status for {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
}
} catch(IOException e) {
LOG.error("Cannot communicate with {}: {}",
getNamenodeDesc(), e.getMessage());
} catch(Throwable e) {
// Generic error that we don't know about
LOG.error("Unexpected exception while communicating with {}: {}",
getNamenodeDesc(), e.getMessage(), e);
}
return report;
}
/**
* Get the description of the Namenode to monitor.
* @return Description of the Namenode to monitor.
*/
public String getNamenodeDesc() {
if (namenodeId != null && !namenodeId.isEmpty()) {
return nameserviceId + "-" + namenodeId + ":" + serviceAddress;
} else {
return nameserviceId + ":" + serviceAddress;
}
}
/**
* Get the parameters for a Namenode from JMX and add them to the report.
* @param webAddress Web interface of the Namenode to monitor.
* @param report Namenode status report to update with JMX data.
*/
private void updateJMXParameters(
String address, NamenodeStatusReport report) {
try {
// TODO part of this should be moved to its own utility
String query = "Hadoop:service=NameNode,name=FSNamesystem*";
JSONArray aux = FederationUtil.getJmx(query, address);
if (aux != null) {
for (int i = 0; i < aux.length(); i++) {
JSONObject jsonObject = aux.getJSONObject(i);
String name = jsonObject.getString("name");
if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) {
report.setDatanodeInfo(
jsonObject.getInt("NumLiveDataNodes"),
jsonObject.getInt("NumDeadDataNodes"),
jsonObject.getInt("NumDecommissioningDataNodes"),
jsonObject.getInt("NumDecomLiveDataNodes"),
jsonObject.getInt("NumDecomDeadDataNodes"));
} else if (name.equals(
"Hadoop:service=NameNode,name=FSNamesystem")) {
report.setNamesystemInfo(
jsonObject.getLong("CapacityRemaining"),
jsonObject.getLong("CapacityTotal"),
jsonObject.getLong("FilesTotal"),
jsonObject.getLong("BlocksTotal"),
jsonObject.getLong("MissingBlocks"),
jsonObject.getLong("PendingReplicationBlocks"),
jsonObject.getLong("UnderReplicatedBlocks"),
jsonObject.getLong("PendingDeletionBlocks"));
}
}
}
} catch (Exception e) {
LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e);
}
}
}

View File

@ -25,12 +25,16 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -85,6 +89,8 @@ public class Router extends CompositeService {
/** Interface to identify the active NN for a nameservice or blockpool ID. */
private ActiveNamenodeResolver namenodeResolver;
/** Updates the namenode status in the namenode resolver. */
private Collection<NamenodeHeartbeatService> namenodeHearbeatServices;
/** Usage string for help message. */
@ -133,6 +139,22 @@ protected void serviceInit(Configuration configuration) throws Exception {
this.setRpcServerAddress(rpcServer.getRpcAddress());
}
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) {
// Create status updater for each monitored Namenode
this.namenodeHearbeatServices = createNamenodeHearbeatServices();
for (NamenodeHeartbeatService hearbeatService :
this.namenodeHearbeatServices) {
addService(hearbeatService);
}
if (this.namenodeHearbeatServices.isEmpty()) {
LOG.error("Heartbeat is enabled but there are no namenodes to monitor");
}
}
super.serviceInit(conf);
}
@ -242,6 +264,96 @@ public InetSocketAddress getRpcServerAddress() {
return this.rpcAddress;
}
/////////////////////////////////////////////////////////
// Namenode heartbeat monitors
/////////////////////////////////////////////////////////
/**
* Create each of the services that will monitor a Namenode.
*
* @return List of heartbeat services.
*/
protected Collection<NamenodeHeartbeatService>
createNamenodeHearbeatServices() {
Map<String, NamenodeHeartbeatService> ret = new HashMap<>();
if (conf.getBoolean(
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) {
// Create a local heartbet service
NamenodeHeartbeatService localHeartbeatService =
createLocalNamenodeHearbeatService();
if (localHeartbeatService != null) {
String nnDesc = localHeartbeatService.getNamenodeDesc();
ret.put(nnDesc, localHeartbeatService);
}
}
// Create heartbeat services for a list specified by the admin
String namenodes = this.conf.get(
DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE);
if (namenodes != null) {
for (String namenode : namenodes.split(",")) {
String[] namenodeSplit = namenode.split("\\.");
String nsId = null;
String nnId = null;
if (namenodeSplit.length == 2) {
nsId = namenodeSplit[0];
nnId = namenodeSplit[1];
} else if (namenodeSplit.length == 1) {
nsId = namenode;
} else {
LOG.error("Wrong Namenode to monitor: {}", namenode);
}
if (nsId != null) {
NamenodeHeartbeatService heartbeatService =
createNamenodeHearbeatService(nsId, nnId);
if (heartbeatService != null) {
ret.put(heartbeatService.getNamenodeDesc(), heartbeatService);
}
}
}
}
return ret.values();
}
/**
* Create a new status updater for the local Namenode.
*
* @return Updater of the status for the local Namenode.
*/
protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() {
// Detect NN running in this machine
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String nnId = null;
if (HAUtil.isHAEnabled(conf, nsId)) {
nnId = HAUtil.getNameNodeId(conf, nsId);
if (nnId == null) {
LOG.error("Cannot find namenode id for local {}", nsId);
}
}
return createNamenodeHearbeatService(nsId, nnId);
}
/**
* Create a heartbeat monitor for a particular Namenode.
*
* @param nsId Identifier of the nameservice to monitor.
* @param nnId Identifier of the namenode (HA) to monitor.
* @return Updater of the status for the specified Namenode.
*/
protected NamenodeHeartbeatService createNamenodeHearbeatService(
String nsId, String nnId) {
LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId);
NamenodeHeartbeatService ret = new NamenodeHeartbeatService(
namenodeResolver, nsId, nnId);
return ret;
}
/////////////////////////////////////////////////////////
// Submodule getters
/////////////////////////////////////////////////////////

View File

@ -4801,4 +4801,36 @@
</description>
</property>
<property>
<name>dfs.federation.router.heartbeat.enable</name>
<value>true</value>
<description>
Enables the Router to heartbeat into the State Store.
</description>
</property>
<property>
<name>dfs.federation.router.heartbeat.interval</name>
<value>5000</value>
<description>
How often the Router should heartbeat into the State Store in milliseconds.
</description>
</property>
<property>
<name>dfs.federation.router.monitor.namenode</name>
<value></value>
<description>
The identifier of the namenodes to monitor and heartbeat.
</description>
</property>
<property>
<name>dfs.federation.router.monitor.localnamenode.enable</name>
<value>true</value>
<description>
If the Router should monitor the namenode in the local machine.
</description>
</property>
</configuration>

View File

@ -582,6 +582,14 @@ public static class NameNodeInfo {
public void setStartOpt(StartupOption startOpt) {
this.startOpt = startOpt;
}
public String getNameserviceId() {
return this.nameserviceId;
}
public String getNamenodeId() {
return this.nnId;
}
}
/**

View File

@ -56,9 +56,16 @@ public class MockResolver
private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
private String defaultNamespace = null;
public MockResolver() {
this.cleanRegistrations();
}
public MockResolver(Configuration conf) {
this();
}
public MockResolver(Configuration conf, StateStoreService store) {
this.cleanRegistrations();
this();
}
public void addLocation(String mount, String nsId, String location) {

View File

@ -28,6 +28,8 @@ public class RouterConfigBuilder {
private Configuration conf;
private boolean enableRpcServer = false;
private boolean enableHeartbeat = false;
private boolean enableLocalHeartbeat = false;
public RouterConfigBuilder(Configuration configuration) {
this.conf = configuration;
@ -39,6 +41,13 @@ public RouterConfigBuilder() {
public RouterConfigBuilder all() {
this.enableRpcServer = true;
this.enableHeartbeat = true;
this.enableLocalHeartbeat = true;
return this;
}
public RouterConfigBuilder enableLocalHeartbeat(boolean enable) {
this.enableLocalHeartbeat = enable;
return this;
}
@ -47,12 +56,25 @@ public RouterConfigBuilder rpc(boolean enable) {
return this;
}
public RouterConfigBuilder heartbeat(boolean enable) {
this.enableHeartbeat = enable;
return this;
}
public RouterConfigBuilder rpc() {
return this.rpc(true);
}
public RouterConfigBuilder heartbeat() {
return this.heartbeat(true);
}
public Configuration build() {
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE,
this.enableHeartbeat);
conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE,
this.enableLocalHeartbeat);
return conf;
}
}

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
@ -753,6 +754,48 @@ public String getNamenodeTestFileForNS(String nsId) {
return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
}
/**
* Switch a namenode in a nameservice to be the active.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
*/
public void switchToActive(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
cluster.transitionToActive(i);
}
}
} catch (Throwable e) {
LOG.error("Cannot transition to active", e);
}
}
/**
* Switch a namenode in a nameservice to be in standby.
* @param nsId Nameservice identifier.
* @param nnId Namenode identifier.
*/
public void switchToStandby(String nsId, String nnId) {
try {
int total = cluster.getNumNameNodes();
NameNodeInfo[] nns = cluster.getNameNodeInfos();
for (int i = 0; i < total; i++) {
NameNodeInfo nn = nns[i];
if (nn.getNameserviceId().equals(nsId) &&
nn.getNamenodeId().equals(nnId)) {
cluster.transitionToStandby(i);
}
}
} catch (Throwable e) {
LOG.error("Cannot transition to standby", e);
}
}
/**
* Stop the federated HDFS cluster.
*/

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.MockResolver;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.service.Service.STATE;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
/**
* Test the service that heartbeats the state of the namenodes to the State
* Store.
*/
public class TestNamenodeHeartbeat {
private static RouterDFSCluster cluster;
private static ActiveNamenodeResolver namenodeResolver;
private static List<NamenodeHeartbeatService> services;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void globalSetUp() throws Exception {
cluster = new RouterDFSCluster(true, 2);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Mock locator that records the heartbeats
List<String> nss = cluster.getNameservices();
String ns = nss.get(0);
Configuration conf = cluster.generateNamenodeConfiguration(ns);
namenodeResolver = new MockResolver(conf);
namenodeResolver.setRouterId("testrouter");
// Create one heartbeat service per NN
services = new ArrayList<>();
for (NamenodeContext nn : cluster.getNamenodes()) {
String nsId = nn.getNameserviceId();
String nnId = nn.getNamenodeId();
NamenodeHeartbeatService service = new NamenodeHeartbeatService(
namenodeResolver, nsId, nnId);
service.init(conf);
service.start();
services.add(service);
}
}
@AfterClass
public static void tearDown() throws IOException {
cluster.shutdown();
for (NamenodeHeartbeatService service: services) {
service.stop();
service.close();
}
}
@Test
public void testNamenodeHeartbeatService() throws IOException {
RouterDFSCluster testCluster = new RouterDFSCluster(true, 1);
Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration(
NAMESERVICES[0]);
NamenodeHeartbeatService server = new NamenodeHeartbeatService(
namenodeResolver, NAMESERVICES[0], NAMENODES[0]);
server.init(heartbeatConfig);
assertEquals(STATE.INITED, server.getServiceState());
server.start();
assertEquals(STATE.STARTED, server.getServiceState());
server.stop();
assertEquals(STATE.STOPPED, server.getServiceState());
server.close();
}
@Test
public void testHearbeat() throws InterruptedException, IOException {
// Set NAMENODE1 to active for all nameservices
if (cluster.isHighAvailability()) {
for (String ns : cluster.getNameservices()) {
cluster.switchToActive(ns, NAMENODES[0]);
cluster.switchToStandby(ns, NAMENODES[1]);
}
}
// Wait for heartbeats to record
Thread.sleep(5000);
// Verify the locator has matching NN entries for each NS
for (String ns : cluster.getNameservices()) {
List<? extends FederationNamenodeContext> nns =
namenodeResolver.getNamenodesForNameserviceId(ns);
// Active
FederationNamenodeContext active = nns.get(0);
assertEquals(NAMENODES[0], active.getNamenodeId());
// Standby
FederationNamenodeContext standby = nns.get(1);
assertEquals(NAMENODES[1], standby.getNamenodeId());
}
// Switch active NNs in 1/2 nameservices
List<String> nss = cluster.getNameservices();
String failoverNS = nss.get(0);
String normalNs = nss.get(1);
cluster.switchToStandby(failoverNS, NAMENODES[0]);
cluster.switchToActive(failoverNS, NAMENODES[1]);
// Wait for heartbeats to record
Thread.sleep(5000);
// Verify the locator has recorded the failover for the failover NS
List<? extends FederationNamenodeContext> failoverNSs =
namenodeResolver.getNamenodesForNameserviceId(failoverNS);
// Active
FederationNamenodeContext active = failoverNSs.get(0);
assertEquals(NAMENODES[1], active.getNamenodeId());
// Standby
FederationNamenodeContext standby = failoverNSs.get(1);
assertEquals(NAMENODES[0], standby.getNamenodeId());
// Verify the locator has the same records for the other ns
List<? extends FederationNamenodeContext> normalNss =
namenodeResolver.getNamenodesForNameserviceId(normalNs);
// Active
active = normalNss.get(0);
assertEquals(NAMENODES[0], active.getNamenodeId());
// Standby
standby = normalNss.get(1);
assertEquals(NAMENODES[1], standby.getNamenodeId());
}
}

View File

@ -98,6 +98,9 @@ public void testRouterService() throws InterruptedException, IOException {
// Rpc only
testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
// Heartbeat only
testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build());
// Run with all services
testRouterStartup(new RouterConfigBuilder(conf).all().build());
}