From 460a94a10f9c314b77a25e14efbf7c4dc3f5d9aa Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Thu, 15 Nov 2018 10:58:57 -0800 Subject: [PATCH] HDFS-14045. Use different metrics in DataNode to better measure latency of heartbeat/blockReports/incrementalBlockReports of Active/Standby NN. Contributed by Jiandan Yang. --- .../src/site/markdown/Metrics.md | 10 +++ .../hdfs/server/datanode/BPOfferService.java | 12 ++-- .../hdfs/server/datanode/BPServiceActor.java | 38 +++++++--- .../server/datanode/BlockPoolManager.java | 15 +++- .../IncrementalBlockReportManager.java | 5 +- .../datanode/metrics/DataNodeMetrics.java | 33 +++++++-- .../server/datanode/TestBPOfferService.java | 12 +++- .../server/datanode/TestBlockPoolManager.java | 4 +- .../server/datanode/TestDataNodeMetrics.java | 72 +++++++++++++++++++ .../server/datanode/TestDatanodeRegister.java | 2 +- 10 files changed, 175 insertions(+), 28 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 83ad40a248..357b705f55 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -367,14 +367,24 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds | | `HeartbeatsNumOps` | Total number of heartbeats | | `HeartbeatsAvgTime` | Average heartbeat time in milliseconds | +| `HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` | Total number of heartbeats to specific serviceId and nnId | +| `HeartbeatsFor`*ServiceId*`-`*NNId*`AvgTime` | Average heartbeat time in milliseconds to specific serviceId and nnId | | `HeartbeatsTotalNumOps` | Total number of heartbeats which is a duplicate of HeartbeatsNumOps | | `HeartbeatsTotalAvgTime` | Average total heartbeat time in milliseconds | +| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`NumOps` | Total number of heartbeats to specific serviceId and nnId which is a duplicate of `HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` | +| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`AvgTime` | Average total heartbeat time in milliseconds to specific serviceId and nnId | | `LifelinesNumOps` | Total number of lifeline messages | | `LifelinesAvgTime` | Average lifeline message processing time in milliseconds | +| `LifelinesFor`*ServiceId*`-`*NNId*`NumOps` | Total number of lifeline messages to specific serviceId and nnId | +| `LifelinesFor`*ServiceId*`-`*NNId*`AvgTime` | Average lifeline message processing time to specific serviceId and nnId in milliseconds | | `BlockReportsNumOps` | Total number of block report operations | | `BlockReportsAvgTime` | Average time of block report operations in milliseconds | +| `BlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of block report operations to specific serviceId and nnId | +| `BlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of block report operations to specific serviceId and nnId in milliseconds | | `IncrementalBlockReportsNumOps` | Total number of incremental block report operations | | `IncrementalBlockReportsAvgTime` | Average time of incremental block report operations in milliseconds | +| `IncrementalBlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of incremental block report operations to specific serviceId and nnId | +| `IncrementalBlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of incremental block report operations to specific serviceId and nnId in milliseconds | | `CacheReportsNumOps` | Total number of cache report operations | | `CacheReportsAvgTime` | Average time of cache report operations in milliseconds | | `PacketAckRoundTripTimeNanosNumOps` | Total number of ack round trip | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index a25f6a92d8..3233e2c921 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -123,7 +123,7 @@ void writeUnlock() { } BPOfferService( - final String nameserviceId, + final String nameserviceId, List nnIds, List nnAddrs, List lifelineNnAddrs, DataNode dn) { @@ -135,12 +135,13 @@ void writeUnlock() { this.dn = dn; for (int i = 0; i < nnAddrs.size(); ++i) { - this.bpServices.add(new BPServiceActor(nnAddrs.get(i), - lifelineNnAddrs.get(i), this)); + this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i), + nnAddrs.get(i), lifelineNnAddrs.get(i), this)); } } - void refreshNNList(ArrayList addrs, + void refreshNNList(String serviceId, List nnIds, + ArrayList addrs, ArrayList lifelineAddrs) throws IOException { Set oldAddrs = Sets.newHashSet(); for (BPServiceActor actor : bpServices) { @@ -151,7 +152,8 @@ void refreshNNList(ArrayList addrs, // Process added NNs Set addedNNs = Sets.difference(newAddrs, oldAddrs); for (InetSocketAddress addedNN : addedNNs) { - BPServiceActor actor = new BPServiceActor(addedNN, + BPServiceActor actor = new BPServiceActor(serviceId, + nnIds.get(addrs.indexOf(addedNN)), addedNN, lifelineAddrs.get(addrs.indexOf(addedNN)), this); actor.start(); bpServices.add(actor); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 8f7a1861d2..c4faa397bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -100,6 +100,8 @@ enum RunningState { CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED; } + private String serviceId = null; + private String nnId = null; private volatile RunningState runningState = RunningState.CONNECTING; private volatile boolean shouldServiceRun = true; private final DataNode dn; @@ -115,8 +117,8 @@ enum RunningState { final LinkedList bpThreadQueue = new LinkedList(); - BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr, - BPOfferService bpos) { + BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr, + InetSocketAddress lifelineNnAddr, BPOfferService bpos) { this.bpos = bpos; this.dn = bpos.getDataNode(); this.nnAddr = nnAddr; @@ -134,6 +136,12 @@ enum RunningState { dnConf.outliersReportIntervalMs); // get the value of maxDataLength. this.maxDataLength = dnConf.getMaxDataLength(); + if (serviceId != null) { + this.serviceId = serviceId; + } + if (nnId != null) { + this.nnId = nnId; + } } public DatanodeRegistration getBpRegistration() { @@ -354,7 +362,7 @@ List blockReport(long fullBrLeaseId) throws IOException { // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId()); + bpos.getBlockPoolId(), getRpcMetricSuffix()); long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = @@ -417,7 +425,7 @@ List blockReport(long fullBrLeaseId) throws IOException { // Log the block report processing stats from Datanode perspective long brSendCost = monotonicNow() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; - dn.getMetrics().addBlockReport(brSendCost); + dn.getMetrics().addBlockReport(brSendCost, getRpcMetricSuffix()); final int nCmds = cmds.size(); LOG.info((success ? "S" : "Uns") + "uccessfully sent block report 0x" + @@ -439,6 +447,18 @@ List blockReport(long fullBrLeaseId) throws IOException { return cmds.size() == 0 ? null : cmds; } + private String getRpcMetricSuffix() { + if (serviceId == null && nnId == null) { + return null; + } else if (serviceId == null && nnId != null) { + return nnId; + } else if (serviceId != null && nnId == null) { + return serviceId; + } else { + return serviceId + "-" + nnId; + } + } + DatanodeCommand cacheReport() throws IOException { // If caching is disabled, do not send a cache report if (dn.getFSDataset().getCacheCapacity() == 0) { @@ -657,7 +677,8 @@ private void offerService() throws Exception { } fullBlockReportLeaseId = resp.getFullBlockReportLeaseId(); } - dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime); + dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime, + getRpcMetricSuffix()); // If the state of this NN has changed (eg STANDBY->ACTIVE) // then let the BPOfferService update itself. @@ -687,7 +708,7 @@ private void offerService() throws Exception { if (!dn.areIBRDisabledForTests() && (ibrManager.sendImmediately()|| sendHeartbeat)) { ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId()); + bpos.getBlockPoolId(), getRpcMetricSuffix()); } List cmds = null; @@ -709,7 +730,7 @@ private void offerService() throws Exception { if (sendHeartbeat) { dn.getMetrics().addHeartbeatTotal( - scheduler.monotonicNow() - startTime); + scheduler.monotonicNow() - startTime, getRpcMetricSuffix()); } // There is no work to do; sleep until hearbeat timer elapses, @@ -1059,7 +1080,8 @@ private void sendLifelineIfDue() throws IOException { return; } sendLifeline(); - dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime); + dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime, + getRpcMetricSuffix()); scheduler.scheduleNextLifeline(scheduler.monotonicNow()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java index b03c51120a..9a7b6bcf7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java @@ -216,14 +216,18 @@ private void doRefreshNamenodes( lifelineAddrMap.get(nsToAdd); ArrayList addrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); + ArrayList nnIds = + Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList lifelineAddrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); for (String nnId : nnIdToAddr.keySet()) { addrs.add(nnIdToAddr.get(nnId)); + nnIds.add(nnId); lifelineAddrs.add(nnIdToLifelineAddr != null ? nnIdToLifelineAddr.get(nnId) : null); } - BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs); + BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs, + lifelineAddrs); bpByNameserviceId.put(nsToAdd, bpos); offerServices.add(bpos); } @@ -260,17 +264,20 @@ private void doRefreshNamenodes( Lists.newArrayListWithCapacity(nnIdToAddr.size()); ArrayList lifelineAddrs = Lists.newArrayListWithCapacity(nnIdToAddr.size()); + ArrayList nnIds = Lists.newArrayListWithCapacity( + nnIdToAddr.size()); for (String nnId : nnIdToAddr.keySet()) { addrs.add(nnIdToAddr.get(nnId)); lifelineAddrs.add(nnIdToLifelineAddr != null ? nnIdToLifelineAddr.get(nnId) : null); + nnIds.add(nnId); } try { UserGroupInformation.getLoginUser() .doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - bpos.refreshNNList(addrs, lifelineAddrs); + bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs); return null; } }); @@ -288,8 +295,10 @@ public Object run() throws Exception { */ protected BPOfferService createBPOS( final String nameserviceId, + List nnIds, List nnAddrs, List lifelineNnAddrs) { - return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn); + return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs, + dn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java index 1779374f57..9515b736a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java @@ -194,7 +194,7 @@ private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) { /** Send IBRs to namenode. */ void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration, - String bpid) throws IOException { + String bpid, String nnRpcLatencySuffix) throws IOException { // Generate a list of the pending reports for each storage under the lock final StorageReceivedDeletedBlocks[] reports = generateIBRs(); if (reports.length == 0) { @@ -214,7 +214,8 @@ void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration, } finally { if (success) { - dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime); + dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime, + nnRpcLatencySuffix); lastIBR = startTime; } else { // If we didn't succeed in sending the report, put all of the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 8f445a6f6e..89cd1cac3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -34,6 +34,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; import org.apache.hadoop.metrics2.source.JvmMetrics; import java.util.concurrent.ThreadLocalRandom; @@ -161,6 +162,10 @@ public class DataNodeMetrics { private MutableCounterLong ecReconstructionWriteTimeMillis; final MetricsRegistry registry = new MetricsRegistry("datanode"); + @Metric("Milliseconds spent on calling NN rpc") + private MutableRatesWithAggregation + nnRpcLatency = registry.newRatesWithAggregation("nnRpcLatency"); + final String name; JvmMetrics jvmMetrics = null; private DataNodeUsageReportUtil dnUsageReportUtil; @@ -232,25 +237,41 @@ public static DataNodeMetrics create(Configuration conf, String dnName) { public JvmMetrics getJvmMetrics() { return jvmMetrics; } - - public void addHeartbeat(long latency) { + + public void addHeartbeat(long latency, String rpcMetricSuffix) { heartbeats.add(latency); + if (rpcMetricSuffix != null) { + nnRpcLatency.add("HeartbeatsFor" + rpcMetricSuffix, latency); + } } - public void addHeartbeatTotal(long latency) { + public void addHeartbeatTotal(long latency, String rpcMetricSuffix) { heartbeatsTotal.add(latency); + if (rpcMetricSuffix != null) { + nnRpcLatency.add("HeartbeatsTotalFor" + rpcMetricSuffix, latency); + } } - public void addLifeline(long latency) { + public void addLifeline(long latency, String rpcMetricSuffix) { lifelines.add(latency); + if (rpcMetricSuffix != null) { + nnRpcLatency.add("LifelinesFor" + rpcMetricSuffix, latency); + } } - public void addBlockReport(long latency) { + public void addBlockReport(long latency, String rpcMetricSuffix) { blockReports.add(latency); + if (rpcMetricSuffix != null) { + nnRpcLatency.add("BlockReportsFor" + rpcMetricSuffix, latency); + } } - public void addIncrementalBlockReport(long latency) { + public void addIncrementalBlockReport(long latency, + String rpcMetricSuffix) { incrementalBlockReports.add(latency); + if (rpcMetricSuffix != null) { + nnRpcLatency.add("IncrementalBlockReportsFor" + rpcMetricSuffix, latency); + } } public void addCacheReport(long latency) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 26a9f378c9..1dc9aa9fe8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -436,13 +436,16 @@ private BPOfferService setupBPOSForNNs(DataNode mockDn, // function to return the corresponding proxies. final Map nnMap = Maps.newLinkedHashMap(); + List nnIds = Lists.newArrayListWithCapacity(nns.length); for (int port = 0; port < nns.length; port++) { nnMap.put(new InetSocketAddress(port), nns[port]); Mockito.doReturn(nns[port]).when(mockDn).connectToNN( Mockito.eq(new InetSocketAddress(port))); + nnIds.add("nn" + port); } - return new BPOfferService("test_ns", Lists.newArrayList(nnMap.keySet()), + return new BPOfferService("test_ns", nnIds, + Lists.newArrayList(nnMap.keySet()), Collections.nCopies(nnMap.size(), null), mockDn); } @@ -912,7 +915,12 @@ public void testRefreshNameNodes() throws Exception { addrs.add(new InetSocketAddress(2)); lifelineAddrs.add(null); - bpos.refreshNNList(addrs, lifelineAddrs); + ArrayList nnIds = new ArrayList<>(addrs.size()); + for (int i = 0; i < addrs.size(); i++) { + nnIds.add("nn" + i); + } + + bpos.refreshNNList("serviceId", nnIds, addrs, lifelineAddrs); assertEquals(2, bpos.getBPServiceActors().size()); // wait for handshake to run diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java index e061e18242..65ff9b0ff9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java @@ -54,6 +54,7 @@ public void setupBPM() { @Override protected BPOfferService createBPOS( final String nameserviceId, + List nnIds, List nnAddrs, List lifelineNnAddrs) { final int idx = mockIdx++; @@ -69,7 +70,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { doLog("refresh #" + idx); return null; } - }).when(bpos).refreshNNList( + }).when(bpos).refreshNNList(Mockito.anyString(), + Mockito.>any(), Mockito.>any(), Mockito.>any()); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index 98ccd8eea6..b4e26405ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; @@ -27,6 +28,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.List; @@ -35,6 +37,7 @@ import com.google.common.collect.Lists; import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -428,4 +431,73 @@ public Boolean get() { } }, 1000, 6000); } + + @Test + public void testNNRpcMetricsWithNonHA() throws IOException { + Configuration conf = new HdfsConfiguration(); + // setting heartbeat interval to 1 hour to prevent bpServiceActor sends + // heartbeat periodically to NN during running test case, and bpServiceActor + // only sends heartbeat once after startup + conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + assertCounter("HeartbeatsNumOps", 1L, rb); + } + + @Test + public void testNNRpcMetricsWithHA() throws IOException { + Configuration conf = new HdfsConfiguration(); + // setting heartbeat interval to 1 hour to prevent bpServiceActor sends + // heartbeat periodically to NN during running test case, and bpServiceActor + // only sends heartbeat once after startup + conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology( + MiniDFSNNTopology.simpleHATopology()).build(); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + cluster.transitionToActive(0); + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + assertCounter("HeartbeatsForminidfs-ns-nn1NumOps", 1L, rb); + assertCounter("HeartbeatsForminidfs-ns-nn2NumOps", 1L, rb); + assertCounter("HeartbeatsNumOps", 2L, rb); + } + + @Test + public void testNNRpcMetricsWithFederation() throws IOException { + Configuration conf = new HdfsConfiguration(); + // setting heartbeat interval to 1 hour to prevent bpServiceActor sends + // heartbeat periodically to NN during running test case, and bpServiceActor + // only sends heartbeat once after startup + conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology( + MiniDFSNNTopology.simpleFederatedTopology("ns1,ns2")).build(); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + assertCounter("HeartbeatsForns1NumOps", 1L, rb); + assertCounter("HeartbeatsForns2NumOps", 1L, rb); + assertCounter("HeartbeatsNumOps", 2L, rb); + } + + @Test + public void testNNRpcMetricsWithFederationAndHA() throws IOException { + Configuration conf = new HdfsConfiguration(); + // setting heartbeat interval to 1 hour to prevent bpServiceActor sends + // heartbeat periodically to NN during running test case, and bpServiceActor + // only sends heartbeat once after startup + conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology( + MiniDFSNNTopology.simpleHAFederatedTopology(2)).build(); + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + + assertCounter("HeartbeatsForns0-nn0NumOps", 1L, rb); + assertCounter("HeartbeatsForns0-nn1NumOps", 1L, rb); + assertCounter("HeartbeatsForns1-nn0NumOps", 1L, rb); + assertCounter("HeartbeatsForns1-nn1NumOps", 1L, rb); + assertCounter("HeartbeatsNumOps", 4L, rb); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java index 38eb0545c2..13fe9e39b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java @@ -62,7 +62,7 @@ public void setUp() throws IOException { BPOfferService mockBPOS = mock(BPOfferService.class); doReturn(mockDN).when(mockBPOS).getDataNode(); - actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS); + actor = new BPServiceActor("test", "test", INVALID_ADDR, null, mockBPOS); fakeNsInfo = mock(NamespaceInfo.class); // Return a a good software version.