diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 362edd1cac..d7ea8a8373 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -205,6 +205,8 @@ Map getActorInfoMap() { info.put("ActorState", getRunningState()); info.put("LastHeartbeat", String.valueOf(getScheduler().getLastHearbeatTime())); + info.put("LastHeartbeatResponseTime", + String.valueOf(getScheduler().getLastHeartbeatResponseTime())); info.put("LastBlockReport", String.valueOf(getScheduler().getLastBlockReportTime())); info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize())); @@ -568,6 +570,8 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) slowPeers, slowDisks); + scheduler.updateLastHeartbeatResponseTime(monotonicNow()); + if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. scheduler.scheduleNextOutlierReport(); @@ -1190,6 +1194,9 @@ static class Scheduler { @VisibleForTesting volatile long lastHeartbeatTime = monotonicNow(); + @VisibleForTesting + private volatile long lastHeartbeatResponseTime = -1; + @VisibleForTesting boolean resetBlockReportTime = true; @@ -1238,6 +1245,10 @@ void updateLastHeartbeatTime(long heartbeatTime) { lastHeartbeatTime = heartbeatTime; } + void updateLastHeartbeatResponseTime(long heartbeatTime) { + this.lastHeartbeatResponseTime = heartbeatTime; + } + void updateLastBlockReportTime(long blockReportTime) { lastBlockReportTime = blockReportTime; } @@ -1250,6 +1261,10 @@ long getLastHearbeatTime() { return (monotonicNow() - lastHeartbeatTime)/1000; } + private long getLastHeartbeatResponseTime() { + return (monotonicNow() - lastHeartbeatResponseTime) / 1000; + } + long getLastBlockReportTime() { return (monotonicNow() - lastBlockReportTime)/1000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 633befbb5b..402d471087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3496,8 +3496,12 @@ public String getDatanodeHostname() { */ @Override // DataNodeMXBean public String getBPServiceActorInfo() { - final ArrayList> infoArray = - new ArrayList>(); + return JSON.toString(getBPServiceActorInfoMap()); + } + + @VisibleForTesting + public List> getBPServiceActorInfoMap() { + final List> infoArray = new ArrayList<>(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { for (BPServiceActor actor : bpos.getBPServiceActors()) { @@ -3505,7 +3509,7 @@ public String getBPServiceActorInfo() { } } } - return JSON.toString(infoArray); + return infoArray; } /** @@ -3684,6 +3688,29 @@ boolean isRestarting() { * @return true - if the data node is fully started */ public boolean isDatanodeFullyStarted() { + return isDatanodeFullyStarted(false); + } + + /** + * A datanode is considered to be fully started if all the BP threads are + * alive and all the block pools are initialized. If checkConnectionToActiveNamenode is true, + * the datanode is considered to be fully started if it is also heartbeating to + * active namenode in addition to the above-mentioned conditions. + * + * @param checkConnectionToActiveNamenode if true, performs additional check of whether datanode + * is heartbeating to active namenode. + * @return true if the datanode is fully started and also conditionally connected to active + * namenode, false otherwise. + */ + public boolean isDatanodeFullyStarted(boolean checkConnectionToActiveNamenode) { + if (checkConnectionToActiveNamenode) { + for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { + if (!bp.isInitialized() || !bp.isAlive() || bp.getActiveNN() == null) { + return false; + } + } + return true; + } for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { if (!bp.isInitialized() || !bp.isAlive()) { return false; @@ -3691,7 +3718,7 @@ public boolean isDatanodeFullyStarted() { } return true; } - + @VisibleForTesting public DatanodeID getDatanodeId() { return id; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html index b491d5a04e..28cba0153c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html @@ -84,7 +84,8 @@ Namenode HA State Block Pool ID Actor State - Last Heartbeat + Last Heartbeat Sent + Last Heartbeat Response Last Block Report Last Block Report Size (Max Size) @@ -96,6 +97,7 @@ {BlockPoolID} {ActorState} {LastHeartbeat}s + {LastHeartbeatResponseTime}s {#helper_relative_time value="{LastBlockReport}"/} {maxBlockReportSize|fmt_bytes} ({maxDataLength|fmt_bytes}) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index d9550cdf70..4dafbbe574 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2508,6 +2508,24 @@ public boolean restartDataNode(DataNodeProperties dnprop) throws IOException { return restartDataNode(dnprop, false); } + /** + * Wait for the datanode to be fully functional i.e. all the BP service threads are alive, + * all block pools initiated and also connected to active namenode. + * + * @param dn Datanode instance. + * @param timeout Timeout in millis until when we should wait for datanode to be fully + * operational. + * @throws InterruptedException If the thread wait is interrupted. + * @throws TimeoutException If times out while awaiting the fully operational capability of + * datanode. + */ + public void waitDatanodeConnectedToActive(DataNode dn, int timeout) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> dn.isDatanodeFullyStarted(true), + 100, timeout, "Datanode is not connected to active namenode even after " + + timeout + " ms of waiting"); + } + public void waitDatanodeFullyStarted(DataNode dn, int timeout) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java index ea43cccbb1..28330139bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java @@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -294,4 +296,77 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws Exception { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testDataNodeMXBeanLastHeartbeats() throws Exception { + Configuration conf = new Configuration(); + try (MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology(2)) + .build()) { + cluster.waitActive(); + cluster.transitionToActive(0); + cluster.transitionToStandby(1); + + DataNode datanode = cluster.getDataNodes().get(0); + + // Verify and wait until one of the BP service actor identifies active namenode as active + // and another as standby. + cluster.waitDatanodeConnectedToActive(datanode, 5000); + + // Verify that last heartbeat sent to both namenodes in last 5 sec. + assertLastHeartbeatSentTime(datanode, "LastHeartbeat"); + // Verify that last heartbeat response from both namenodes have been received within + // last 5 sec. + assertLastHeartbeatSentTime(datanode, "LastHeartbeatResponseTime"); + + + NameNode sbNameNode = cluster.getNameNode(1); + + // Stopping standby namenode + sbNameNode.stop(); + + // Verify that last heartbeat response time from one of the namenodes would stay much higher + // after stopping one namenode. + GenericTestUtils.waitFor(() -> { + List> bpServiceActorInfo = datanode.getBPServiceActorInfoMap(); + Map bpServiceActorInfo1 = bpServiceActorInfo.get(0); + Map bpServiceActorInfo2 = bpServiceActorInfo.get(1); + + long lastHeartbeatResponseTime1 = + Long.parseLong(bpServiceActorInfo1.get("LastHeartbeatResponseTime")); + long lastHeartbeatResponseTime2 = + Long.parseLong(bpServiceActorInfo2.get("LastHeartbeatResponseTime")); + + LOG.info("Last heartbeat response from namenode 1: {}", lastHeartbeatResponseTime1); + LOG.info("Last heartbeat response from namenode 2: {}", lastHeartbeatResponseTime2); + + return (lastHeartbeatResponseTime1 < 5L && lastHeartbeatResponseTime2 > 5L) || ( + lastHeartbeatResponseTime1 > 5L && lastHeartbeatResponseTime2 < 5L); + + }, 200, 15000, + "Last heartbeat response should be higher than 5s for at least one namenode"); + + // Verify that last heartbeat sent to both namenodes in last 5 sec even though + // the last heartbeat received from one of the namenodes is greater than 5 sec ago. + assertLastHeartbeatSentTime(datanode, "LastHeartbeat"); + } + } + + private static void assertLastHeartbeatSentTime(DataNode datanode, String lastHeartbeat) { + List> bpServiceActorInfo = datanode.getBPServiceActorInfoMap(); + Map bpServiceActorInfo1 = bpServiceActorInfo.get(0); + Map bpServiceActorInfo2 = bpServiceActorInfo.get(1); + + long lastHeartbeatSent1 = + Long.parseLong(bpServiceActorInfo1.get(lastHeartbeat)); + long lastHeartbeatSent2 = + Long.parseLong(bpServiceActorInfo2.get(lastHeartbeat)); + + Assert.assertTrue(lastHeartbeat + " for first bp service actor is higher than 5s", + lastHeartbeatSent1 < 5L); + Assert.assertTrue(lastHeartbeat + " for second bp service actor is higher than 5s", + lastHeartbeatSent2 < 5L); + } + }