diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f3365d0f94..2871bdc8a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1150,6 +1150,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_ROUTER_PREFIX + "rpc.enable"; public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + // HDFS Router heartbeat + public static final String DFS_ROUTER_HEARTBEAT_ENABLE = + FEDERATION_ROUTER_PREFIX + "heartbeat.enable"; + public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS = + FEDERATION_ROUTER_PREFIX + "heartbeat.interval"; + public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(5); + public static final String DFS_ROUTER_MONITOR_NAMENODE = + FEDERATION_ROUTER_PREFIX + "monitor.namenode"; + public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE = + FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable"; + public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true; + // HDFS Router NN client public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = FEDERATION_ROUTER_PREFIX + "connection.pool-size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index 32a1caeaa3..2f9781a455 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -1323,6 +1323,44 @@ public static String getNamenodeServiceAddr(final Configuration conf, return serviceRpcAddr; } + /** + * Map a logical namenode ID to its web address. Use the given nameservice if + * specified, or the configured one if none is given. + * + * @param conf Configuration + * @param nsId which nameservice nnId is a part of, optional + * @param nnId the namenode ID to get the service addr for + * @return the service addr, null if it could not be determined + */ + public static String getNamenodeWebAddr(final Configuration conf, String nsId, + String nnId) { + + if (nsId == null) { + nsId = getOnlyNameServiceIdOrNull(conf); + } + + String webAddrKey = DFSUtilClient.concatSuffixes( + DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, nsId, nnId); + + String webAddr = + conf.get(webAddrKey, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT); + return webAddr; + } + + /** + * Get all of the Web addresses of the individual NNs in a given nameservice. + * + * @param conf Configuration + * @param nsId the nameservice whose NNs addresses we want. + * @param defaultValue default address to return in case key is not found. + * @return A map from nnId -> Web address of each NN in the nameservice. + */ + public static Map getWebAddressesForNameserviceId( + Configuration conf, String nsId, String defaultValue) { + return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue, + DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + } + /** * If the configuration refers to only a single nameservice, return the * name of that nameservice. If it refers to 0 or more than 1, return null. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java index 9259048f26..f8759e8713 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java @@ -39,8 +39,29 @@ public class NamenodeStatusReport { private HAServiceState status = HAServiceState.STANDBY; private boolean safeMode = false; + /** Datanodes stats. */ + private int liveDatanodes = -1; + private int deadDatanodes = -1; + /** Decommissioning datanodes. */ + private int decomDatanodes = -1; + /** Live decommissioned datanodes. */ + private int liveDecomDatanodes = -1; + /** Dead decommissioned datanodes. */ + private int deadDecomDatanodes = -1; + + /** Space stats. */ + private long availableSpace = -1; + private long numOfFiles = -1; + private long numOfBlocks = -1; + private long numOfBlocksMissing = -1; + private long numOfBlocksPendingReplication = -1; + private long numOfBlocksUnderReplicated = -1; + private long numOfBlocksPendingDeletion = -1; + private long totalSpace = -1; + /** If the fields are valid. */ private boolean registrationValid = false; + private boolean statsValid = false; private boolean haStateValid = false; public NamenodeStatusReport(String ns, String nn, String rpc, String service, @@ -53,6 +74,15 @@ public NamenodeStatusReport(String ns, String nn, String rpc, String service, this.webAddress = web; } + /** + * If the statistics are valid. + * + * @return If the statistics are valid. + */ + public boolean statsValid() { + return this.statsValid; + } + /** * If the registration is valid. * @@ -187,6 +217,169 @@ public boolean getSafemode() { return this.safeMode; } + /** + * Set the datanode information. + * + * @param numLive Number of live nodes. + * @param numDead Number of dead nodes. + * @param numDecom Number of decommissioning nodes. + * @param numLiveDecom Number of decommissioned live nodes. + * @param numDeadDecom Number of decommissioned dead nodes. + */ + public void setDatanodeInfo(int numLive, int numDead, int numDecom, + int numLiveDecom, int numDeadDecom) { + this.liveDatanodes = numLive; + this.deadDatanodes = numDead; + this.decomDatanodes = numDecom; + this.liveDecomDatanodes = numLiveDecom; + this.deadDecomDatanodes = numDeadDecom; + this.statsValid = true; + } + + /** + * Get the number of live blocks. + * + * @return The number of dead nodes. + */ + public int getNumLiveDatanodes() { + return this.liveDatanodes; + } + + /** + * Get the number of dead blocks. + * + * @return The number of dead nodes. + */ + public int getNumDeadDatanodes() { + return this.deadDatanodes; + } + + /** + * Get the number of decommissionining nodes. + * + * @return The number of decommissionining nodes. + */ + public int getNumDecommissioningDatanodes() { + return this.decomDatanodes; + } + + /** + * Get the number of live decommissioned nodes. + * + * @return The number of live decommissioned nodes. + */ + public int getNumDecomLiveDatanodes() { + return this.liveDecomDatanodes; + } + + /** + * Get the number of dead decommissioned nodes. + * + * @return The number of dead decommissioned nodes. + */ + public int getNumDecomDeadDatanodes() { + return this.deadDecomDatanodes; + } + + /** + * Set the filesystem information. + * + * @param available Available capacity. + * @param total Total capacity. + * @param numFiles Number of files. + * @param numBlocks Total number of blocks. + * @param numBlocksMissing Number of missing blocks. + * @param numOfBlocksPendingReplication Number of blocks pending replication. + * @param numOfBlocksUnderReplicated Number of blocks under replication. + * @param numOfBlocksPendingDeletion Number of blocks pending deletion. + */ + public void setNamesystemInfo(long available, long total, + long numFiles, long numBlocks, long numBlocksMissing, + long numBlocksPendingReplication, long numBlocksUnderReplicated, + long numBlocksPendingDeletion) { + this.totalSpace = total; + this.availableSpace = available; + this.numOfBlocks = numBlocks; + this.numOfBlocksMissing = numBlocksMissing; + this.numOfBlocksPendingReplication = numBlocksPendingReplication; + this.numOfBlocksUnderReplicated = numBlocksUnderReplicated; + this.numOfBlocksPendingDeletion = numBlocksPendingDeletion; + this.numOfFiles = numFiles; + this.statsValid = true; + } + + /** + * Get the number of blocks. + * + * @return The number of blocks. + */ + public long getNumBlocks() { + return this.numOfBlocks; + } + + /** + * Get the number of files. + * + * @return The number of files. + */ + public long getNumFiles() { + return this.numOfFiles; + } + + /** + * Get the total space. + * + * @return The total space. + */ + public long getTotalSpace() { + return this.totalSpace; + } + + /** + * Get the available space. + * + * @return The available space. + */ + public long getAvailableSpace() { + return this.availableSpace; + } + + /** + * Get the number of missing blocks. + * + * @return Number of missing blocks. + */ + public long getNumBlocksMissing() { + return this.numOfBlocksMissing; + } + + /** + * Get the number of pending replication blocks. + * + * @return Number of pending replication blocks. + */ + public long getNumOfBlocksPendingReplication() { + return this.numOfBlocksPendingReplication; + } + + /** + * Get the number of under replicated blocks. + * + * @return Number of under replicated blocks. + */ + public long getNumOfBlocksUnderReplicated() { + return this.numOfBlocksUnderReplicated; + } + + /** + * Get the number of pending deletion blocks. + * + * @return Number of pending deletion blocks. + */ + public long getNumOfBlocksPendingDeletion() { + return this.numOfBlocksPendingDeletion; + } + @Override public String toString() { return String.format("%s-%s:%s", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java index 0129a37c0c..78c473a3d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java @@ -17,13 +17,22 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.reflect.Constructor; +import java.net.URL; +import java.net.URLConnection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +48,63 @@ private FederationUtil() { // Utility Class } + /** + * Get a JMX data from a web endpoint. + * + * @param beanQuery JMX bean. + * @param webAddress Web address of the JMX endpoint. + * @return JSON with the JMX data + */ + public static JSONArray getJmx(String beanQuery, String webAddress) { + JSONArray ret = null; + BufferedReader reader = null; + try { + String host = webAddress; + int port = -1; + if (webAddress.indexOf(":") > 0) { + String[] webAddressSplit = webAddress.split(":"); + host = webAddressSplit[0]; + port = Integer.parseInt(webAddressSplit[1]); + } + URL jmxURL = new URL("http", host, port, "/jmx?qry=" + beanQuery); + URLConnection conn = jmxURL.openConnection(); + conn.setConnectTimeout(5 * 1000); + conn.setReadTimeout(5 * 1000); + InputStream in = conn.getInputStream(); + InputStreamReader isr = new InputStreamReader(in, "UTF-8"); + reader = new BufferedReader(isr); + + StringBuilder sb = new StringBuilder(); + String line = null; + while ((line = reader.readLine()) != null) { + sb.append(line); + } + String jmxOutput = sb.toString(); + + // Parse JSON + JSONObject json = new JSONObject(jmxOutput); + ret = json.getJSONArray("beans"); + } catch (IOException e) { + LOG.error("Cannot read JMX bean {} from server {}: {}", + beanQuery, webAddress, e.getMessage()); + } catch (JSONException e) { + LOG.error("Cannot parse JMX output for {} from server {}: {}", + beanQuery, webAddress, e.getMessage()); + } catch (Exception e) { + LOG.error("Cannot parse JMX output for {} from server {}: {}", + beanQuery, webAddress, e); + } finally { + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + LOG.error("Problem closing {}", webAddress, e); + } + } + } + return ret; + } + /** * Create an instance of an interface with a constructor using a context. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java new file mode 100644 index 0000000000..fe4f939d6b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java @@ -0,0 +1,350 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link Router} periodically checks the state of a Namenode (usually on + * the same server) and reports their high availability (HA) state and + * load/space status to the + * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService} + * . Note that this is an optional role as a Router can be independent of any + * subcluster. + *

+ * For performance with Namenode HA, the Router uses the high availability state + * information in the State Store to forward the request to the Namenode that is + * most likely to be active. + *

+ * Note that this service can be embedded into the Namenode itself to simplify + * the operation. + */ +public class NamenodeHeartbeatService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(NamenodeHeartbeatService.class); + + + /** Configuration for the heartbeat. */ + private Configuration conf; + + /** Router performing the heartbeating. */ + private final ActiveNamenodeResolver resolver; + + /** Interface to the tracked NN. */ + private final String nameserviceId; + private final String namenodeId; + + /** Namenode HA target. */ + private NNHAServiceTarget localTarget; + /** RPC address for the namenode. */ + private String rpcAddress; + /** Service RPC address for the namenode. */ + private String serviceAddress; + /** Service RPC address for the namenode. */ + private String lifelineAddress; + /** HTTP address for the namenode. */ + private String webAddress; + + /** + * Create a new Namenode status updater. + * @param resolver Namenode resolver service to handle NN registration. + * @param nameserviceId Identifier of the nameservice. + * @param namenodeId Identifier of the namenode in HA. + */ + public NamenodeHeartbeatService( + ActiveNamenodeResolver resolver, String nsId, String nnId) { + super(NamenodeHeartbeatService.class.getSimpleName() + " " + nsId + " " + + nnId); + + this.resolver = resolver; + + this.nameserviceId = nsId; + this.namenodeId = nnId; + + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + + this.conf = configuration; + + if (this.namenodeId != null && !this.namenodeId.isEmpty()) { + this.localTarget = new NNHAServiceTarget( + conf, nameserviceId, namenodeId); + } else { + this.localTarget = null; + } + + // Get the RPC address for the clients to connect + this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); + LOG.info("{}-{} RPC address: {}", + nameserviceId, namenodeId, rpcAddress); + + // Get the Service RPC address for monitoring + this.serviceAddress = + DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); + if (this.serviceAddress == null) { + LOG.error("Cannot locate RPC service address for NN {}-{}, " + + "using RPC address {}", nameserviceId, namenodeId, this.rpcAddress); + this.serviceAddress = this.rpcAddress; + } + LOG.info("{}-{} Service RPC address: {}", + nameserviceId, namenodeId, serviceAddress); + + // Get the Lifeline RPC address for faster monitoring + this.lifelineAddress = + DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId); + if (this.lifelineAddress == null) { + this.lifelineAddress = this.serviceAddress; + } + LOG.info("{}-{} Lifeline RPC address: {}", + nameserviceId, namenodeId, lifelineAddress); + + // Get the Web address for UI + this.webAddress = + DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); + LOG.info("{}-{} Web address: {}", nameserviceId, namenodeId, webAddress); + + this.setIntervalMs(conf.getLong( + DFS_ROUTER_HEARTBEAT_INTERVAL_MS, + DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT)); + + + super.serviceInit(configuration); + } + + @Override + public void periodicInvoke() { + updateState(); + } + + /** + * Get the RPC address for a Namenode. + * @param conf Configuration. + * @param nsId Name service identifier. + * @param nnId Name node identifier. + * @return RPC address in format hostname:1234. + */ + private static String getRpcAddress( + Configuration conf, String nsId, String nnId) { + + // Get it from the regular RPC setting + String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; + String ret = conf.get(confKey); + + if (nsId != null && nnId != null) { + // Get if for the proper nameservice and namenode + confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); + ret = conf.get(confKey); + + // If not available, get it from the map + if (ret == null) { + Map rpcAddresses = + DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); + if (rpcAddresses.containsKey(nnId)) { + InetSocketAddress sockAddr = rpcAddresses.get(nnId); + InetAddress addr = sockAddr.getAddress(); + ret = addr.getHostAddress() + ":" + sockAddr.getPort(); + } + } + } + return ret; + } + + /** + * Update the state of the Namenode. + */ + private void updateState() { + NamenodeStatusReport report = getNamenodeStatusReport(); + if (!report.registrationValid()) { + // Not operational + LOG.error("Namenode is not operational: {}", getNamenodeDesc()); + } else if (report.haStateValid()) { + // block and HA status available + LOG.debug("Received service state: {} from HA namenode: {}", + report.getState(), getNamenodeDesc()); + } else if (localTarget == null) { + // block info available, HA status not expected + LOG.debug( + "Reporting non-HA namenode as operational: " + getNamenodeDesc()); + } else { + // block info available, HA status should be available, but was not + // fetched do nothing and let the current state stand + return; + } + try { + if (!resolver.registerNamenode(report)) { + LOG.warn("Cannot register namenode {}", report); + } + } catch (IOException e) { + LOG.info("Cannot register namenode in the State Store"); + } catch (Exception ex) { + LOG.error("Unhandled exception updating NN registration for {}", + getNamenodeDesc(), ex); + } + } + + /** + * Get the status report for the Namenode monitored by this heartbeater. + * @return Namenode status report. + */ + protected NamenodeStatusReport getNamenodeStatusReport() { + NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId, + namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress); + + try { + LOG.debug("Probing NN at service address: {}", serviceAddress); + + URI serviceURI = new URI("hdfs://" + serviceAddress); + // Read the filesystem info from RPC (required) + NamenodeProtocol nn = NameNodeProxies + .createProxy(this.conf, serviceURI, NamenodeProtocol.class) + .getProxy(); + + if (nn != null) { + NamespaceInfo info = nn.versionRequest(); + if (info != null) { + report.setNamespaceInfo(info); + } + } + if (!report.registrationValid()) { + return report; + } + + // Check for safemode from the client protocol. Currently optional, but + // should be required at some point for QoS + try { + ClientProtocol client = NameNodeProxies + .createProxy(this.conf, serviceURI, ClientProtocol.class) + .getProxy(); + if (client != null) { + boolean isSafeMode = client.setSafeMode( + SafeModeAction.SAFEMODE_GET, false); + report.setSafeMode(isSafeMode); + } + } catch (Exception e) { + LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e); + } + + // Read the stats from JMX (optional) + updateJMXParameters(webAddress, report); + + if (localTarget != null) { + // Try to get the HA status + try { + // Determine if NN is active + // TODO: dynamic timeout + HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000); + HAServiceStatus status = haProtocol.getServiceStatus(); + report.setHAServiceState(status.getState()); + } catch (Throwable e) { + // Failed to fetch HA status, ignoring failure + LOG.error("Cannot fetch HA status for {}: {}", + getNamenodeDesc(), e.getMessage(), e); + } + } + } catch(IOException e) { + LOG.error("Cannot communicate with {}: {}", + getNamenodeDesc(), e.getMessage()); + } catch(Throwable e) { + // Generic error that we don't know about + LOG.error("Unexpected exception while communicating with {}: {}", + getNamenodeDesc(), e.getMessage(), e); + } + return report; + } + + /** + * Get the description of the Namenode to monitor. + * @return Description of the Namenode to monitor. + */ + public String getNamenodeDesc() { + if (namenodeId != null && !namenodeId.isEmpty()) { + return nameserviceId + "-" + namenodeId + ":" + serviceAddress; + } else { + return nameserviceId + ":" + serviceAddress; + } + } + + /** + * Get the parameters for a Namenode from JMX and add them to the report. + * @param webAddress Web interface of the Namenode to monitor. + * @param report Namenode status report to update with JMX data. + */ + private void updateJMXParameters( + String address, NamenodeStatusReport report) { + try { + // TODO part of this should be moved to its own utility + String query = "Hadoop:service=NameNode,name=FSNamesystem*"; + JSONArray aux = FederationUtil.getJmx(query, address); + if (aux != null) { + for (int i = 0; i < aux.length(); i++) { + JSONObject jsonObject = aux.getJSONObject(i); + String name = jsonObject.getString("name"); + if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) { + report.setDatanodeInfo( + jsonObject.getInt("NumLiveDataNodes"), + jsonObject.getInt("NumDeadDataNodes"), + jsonObject.getInt("NumDecommissioningDataNodes"), + jsonObject.getInt("NumDecomLiveDataNodes"), + jsonObject.getInt("NumDecomDeadDataNodes")); + } else if (name.equals( + "Hadoop:service=NameNode,name=FSNamesystem")) { + report.setNamesystemInfo( + jsonObject.getLong("CapacityRemaining"), + jsonObject.getLong("CapacityTotal"), + jsonObject.getLong("FilesTotal"), + jsonObject.getLong("BlocksTotal"), + jsonObject.getLong("MissingBlocks"), + jsonObject.getLong("PendingReplicationBlocks"), + jsonObject.getLong("UnderReplicatedBlocks"), + jsonObject.getLong("PendingDeletionBlocks")); + } + } + } + } catch (Exception e) { + LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 019a5cd983..cfddf200b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -25,12 +25,16 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; @@ -85,6 +89,8 @@ public class Router extends CompositeService { /** Interface to identify the active NN for a nameservice or blockpool ID. */ private ActiveNamenodeResolver namenodeResolver; + /** Updates the namenode status in the namenode resolver. */ + private Collection namenodeHearbeatServices; /** Usage string for help message. */ @@ -133,6 +139,22 @@ protected void serviceInit(Configuration configuration) throws Exception { this.setRpcServerAddress(rpcServer.getRpcAddress()); } + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, + DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) { + + // Create status updater for each monitored Namenode + this.namenodeHearbeatServices = createNamenodeHearbeatServices(); + for (NamenodeHeartbeatService hearbeatService : + this.namenodeHearbeatServices) { + addService(hearbeatService); + } + + if (this.namenodeHearbeatServices.isEmpty()) { + LOG.error("Heartbeat is enabled but there are no namenodes to monitor"); + } + } + super.serviceInit(conf); } @@ -242,6 +264,96 @@ public InetSocketAddress getRpcServerAddress() { return this.rpcAddress; } + ///////////////////////////////////////////////////////// + // Namenode heartbeat monitors + ///////////////////////////////////////////////////////// + + /** + * Create each of the services that will monitor a Namenode. + * + * @return List of heartbeat services. + */ + protected Collection + createNamenodeHearbeatServices() { + + Map ret = new HashMap<>(); + + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, + DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) { + // Create a local heartbet service + NamenodeHeartbeatService localHeartbeatService = + createLocalNamenodeHearbeatService(); + if (localHeartbeatService != null) { + String nnDesc = localHeartbeatService.getNamenodeDesc(); + ret.put(nnDesc, localHeartbeatService); + } + } + + // Create heartbeat services for a list specified by the admin + String namenodes = this.conf.get( + DFSConfigKeys.DFS_ROUTER_MONITOR_NAMENODE); + if (namenodes != null) { + for (String namenode : namenodes.split(",")) { + String[] namenodeSplit = namenode.split("\\."); + String nsId = null; + String nnId = null; + if (namenodeSplit.length == 2) { + nsId = namenodeSplit[0]; + nnId = namenodeSplit[1]; + } else if (namenodeSplit.length == 1) { + nsId = namenode; + } else { + LOG.error("Wrong Namenode to monitor: {}", namenode); + } + if (nsId != null) { + NamenodeHeartbeatService heartbeatService = + createNamenodeHearbeatService(nsId, nnId); + if (heartbeatService != null) { + ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); + } + } + } + } + + return ret.values(); + } + + /** + * Create a new status updater for the local Namenode. + * + * @return Updater of the status for the local Namenode. + */ + protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() { + // Detect NN running in this machine + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + String nnId = null; + if (HAUtil.isHAEnabled(conf, nsId)) { + nnId = HAUtil.getNameNodeId(conf, nsId); + if (nnId == null) { + LOG.error("Cannot find namenode id for local {}", nsId); + } + } + + return createNamenodeHearbeatService(nsId, nnId); + } + + /** + * Create a heartbeat monitor for a particular Namenode. + * + * @param nsId Identifier of the nameservice to monitor. + * @param nnId Identifier of the namenode (HA) to monitor. + * @return Updater of the status for the specified Namenode. + */ + protected NamenodeHeartbeatService createNamenodeHearbeatService( + String nsId, String nnId) { + + LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId); + NamenodeHeartbeatService ret = new NamenodeHeartbeatService( + namenodeResolver, nsId, nnId); + return ret; + } + ///////////////////////////////////////////////////////// // Submodule getters ///////////////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c90aa8043e..cc4a2bfdf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4801,4 +4801,36 @@ + + dfs.federation.router.heartbeat.enable + true + + Enables the Router to heartbeat into the State Store. + + + + + dfs.federation.router.heartbeat.interval + 5000 + + How often the Router should heartbeat into the State Store in milliseconds. + + + + + dfs.federation.router.monitor.namenode + + + The identifier of the namenodes to monitor and heartbeat. + + + + + dfs.federation.router.monitor.localnamenode.enable + true + + If the Router should monitor the namenode in the local machine. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 0345cf5ef5..da91006177 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -582,6 +582,14 @@ public static class NameNodeInfo { public void setStartOpt(StartupOption startOpt) { this.startOpt = startOpt; } + + public String getNameserviceId() { + return this.nameserviceId; + } + + public String getNamenodeId() { + return this.nnId; + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index 2875750840..87427fd6be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -56,9 +56,16 @@ public class MockResolver private Set namespaces = new HashSet<>(); private String defaultNamespace = null; + public MockResolver() { + this.cleanRegistrations(); + } + + public MockResolver(Configuration conf) { + this(); + } public MockResolver(Configuration conf, StateStoreService store) { - this.cleanRegistrations(); + this(); } public void addLocation(String mount, String nsId, String location) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index 39fcf7aa63..21555c55a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -28,6 +28,8 @@ public class RouterConfigBuilder { private Configuration conf; private boolean enableRpcServer = false; + private boolean enableHeartbeat = false; + private boolean enableLocalHeartbeat = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -39,6 +41,13 @@ public RouterConfigBuilder() { public RouterConfigBuilder all() { this.enableRpcServer = true; + this.enableHeartbeat = true; + this.enableLocalHeartbeat = true; + return this; + } + + public RouterConfigBuilder enableLocalHeartbeat(boolean enable) { + this.enableLocalHeartbeat = enable; return this; } @@ -47,12 +56,25 @@ public RouterConfigBuilder rpc(boolean enable) { return this; } + public RouterConfigBuilder heartbeat(boolean enable) { + this.enableHeartbeat = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } + public RouterConfigBuilder heartbeat() { + return this.heartbeat(true); + } + public Configuration build() { conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, + this.enableHeartbeat); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, + this.enableLocalHeartbeat); return conf; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java index 4031b7fb30..0830c197bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.NameNodeInfo; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf; import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf; @@ -753,6 +754,48 @@ public String getNamenodeTestFileForNS(String nsId) { return getNamenodePathForNS(nsId) + "/" + TEST_FILE; } + /** + * Switch a namenode in a nameservice to be the active. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToActive(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && + nn.getNamenodeId().equals(nnId)) { + cluster.transitionToActive(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to active", e); + } + } + + /** + * Switch a namenode in a nameservice to be in standby. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToStandby(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && + nn.getNamenodeId().equals(nnId)) { + cluster.transitionToStandby(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to standby", e); + } + } + /** * Stop the federated HDFS cluster. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java new file mode 100644 index 0000000000..877fb02f44 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestNamenodeHeartbeat.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.service.Service.STATE; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test the service that heartbeats the state of the namenodes to the State + * Store. + */ +public class TestNamenodeHeartbeat { + + private static RouterDFSCluster cluster; + private static ActiveNamenodeResolver namenodeResolver; + private static List services; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void globalSetUp() throws Exception { + + cluster = new RouterDFSCluster(true, 2); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Mock locator that records the heartbeats + List nss = cluster.getNameservices(); + String ns = nss.get(0); + Configuration conf = cluster.generateNamenodeConfiguration(ns); + namenodeResolver = new MockResolver(conf); + namenodeResolver.setRouterId("testrouter"); + + // Create one heartbeat service per NN + services = new ArrayList<>(); + for (NamenodeContext nn : cluster.getNamenodes()) { + String nsId = nn.getNameserviceId(); + String nnId = nn.getNamenodeId(); + NamenodeHeartbeatService service = new NamenodeHeartbeatService( + namenodeResolver, nsId, nnId); + service.init(conf); + service.start(); + services.add(service); + } + } + + @AfterClass + public static void tearDown() throws IOException { + cluster.shutdown(); + for (NamenodeHeartbeatService service: services) { + service.stop(); + service.close(); + } + } + + @Test + public void testNamenodeHeartbeatService() throws IOException { + + RouterDFSCluster testCluster = new RouterDFSCluster(true, 1); + Configuration heartbeatConfig = testCluster.generateNamenodeConfiguration( + NAMESERVICES[0]); + NamenodeHeartbeatService server = new NamenodeHeartbeatService( + namenodeResolver, NAMESERVICES[0], NAMENODES[0]); + server.init(heartbeatConfig); + assertEquals(STATE.INITED, server.getServiceState()); + server.start(); + assertEquals(STATE.STARTED, server.getServiceState()); + server.stop(); + assertEquals(STATE.STOPPED, server.getServiceState()); + server.close(); + } + + @Test + public void testHearbeat() throws InterruptedException, IOException { + + // Set NAMENODE1 to active for all nameservices + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + } + } + + // Wait for heartbeats to record + Thread.sleep(5000); + + // Verify the locator has matching NN entries for each NS + for (String ns : cluster.getNameservices()) { + List nns = + namenodeResolver.getNamenodesForNameserviceId(ns); + + // Active + FederationNamenodeContext active = nns.get(0); + assertEquals(NAMENODES[0], active.getNamenodeId()); + + // Standby + FederationNamenodeContext standby = nns.get(1); + assertEquals(NAMENODES[1], standby.getNamenodeId()); + } + + // Switch active NNs in 1/2 nameservices + List nss = cluster.getNameservices(); + String failoverNS = nss.get(0); + String normalNs = nss.get(1); + + cluster.switchToStandby(failoverNS, NAMENODES[0]); + cluster.switchToActive(failoverNS, NAMENODES[1]); + + // Wait for heartbeats to record + Thread.sleep(5000); + + // Verify the locator has recorded the failover for the failover NS + List failoverNSs = + namenodeResolver.getNamenodesForNameserviceId(failoverNS); + // Active + FederationNamenodeContext active = failoverNSs.get(0); + assertEquals(NAMENODES[1], active.getNamenodeId()); + + // Standby + FederationNamenodeContext standby = failoverNSs.get(1); + assertEquals(NAMENODES[0], standby.getNamenodeId()); + + // Verify the locator has the same records for the other ns + List normalNss = + namenodeResolver.getNamenodesForNameserviceId(normalNs); + // Active + active = normalNss.get(0); + assertEquals(NAMENODES[0], active.getNamenodeId()); + // Standby + standby = normalNss.get(1); + assertEquals(NAMENODES[1], standby.getNamenodeId()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index d8afb39853..2074d3d0ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -98,6 +98,9 @@ public void testRouterService() throws InterruptedException, IOException { // Rpc only testRouterStartup(new RouterConfigBuilder(conf).rpc().build()); + // Heartbeat only + testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build()); + // Run with all services testRouterStartup(new RouterConfigBuilder(conf).all().build()); }