HDFS-14045. Use different metrics in DataNode to better measure latency of heartbeat/blockReports/incrementalBlockReports of Active/Standby NN. Contributed by Jiandan Yang.

This commit is contained in:
Inigo Goiri 2018-11-15 10:58:57 -08:00
parent 993c2140cc
commit 460a94a10f
10 changed files with 175 additions and 28 deletions

View File

@ -367,14 +367,24 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `ReplaceBlockOpAvgTime` | Average time of block replace operations in milliseconds |
| `HeartbeatsNumOps` | Total number of heartbeats |
| `HeartbeatsAvgTime` | Average heartbeat time in milliseconds |
| `HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` | Total number of heartbeats to specific serviceId and nnId |
| `HeartbeatsFor`*ServiceId*`-`*NNId*`AvgTime` | Average heartbeat time in milliseconds to specific serviceId and nnId |
| `HeartbeatsTotalNumOps` | Total number of heartbeats which is a duplicate of HeartbeatsNumOps |
| `HeartbeatsTotalAvgTime` | Average total heartbeat time in milliseconds |
| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`NumOps` | Total number of heartbeats to specific serviceId and nnId which is a duplicate of `HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` |
| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`AvgTime` | Average total heartbeat time in milliseconds to specific serviceId and nnId |
| `LifelinesNumOps` | Total number of lifeline messages |
| `LifelinesAvgTime` | Average lifeline message processing time in milliseconds |
| `LifelinesFor`*ServiceId*`-`*NNId*`NumOps` | Total number of lifeline messages to specific serviceId and nnId |
| `LifelinesFor`*ServiceId*`-`*NNId*`AvgTime` | Average lifeline message processing time to specific serviceId and nnId in milliseconds |
| `BlockReportsNumOps` | Total number of block report operations |
| `BlockReportsAvgTime` | Average time of block report operations in milliseconds |
| `BlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of block report operations to specific serviceId and nnId |
| `BlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of block report operations to specific serviceId and nnId in milliseconds |
| `IncrementalBlockReportsNumOps` | Total number of incremental block report operations |
| `IncrementalBlockReportsAvgTime` | Average time of incremental block report operations in milliseconds |
| `IncrementalBlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of incremental block report operations to specific serviceId and nnId |
| `IncrementalBlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of incremental block report operations to specific serviceId and nnId in milliseconds |
| `CacheReportsNumOps` | Total number of cache report operations |
| `CacheReportsAvgTime` | Average time of cache report operations in milliseconds |
| `PacketAckRoundTripTimeNanosNumOps` | Total number of ack round trip |

View File

@ -123,7 +123,7 @@ void writeUnlock() {
}
BPOfferService(
final String nameserviceId,
final String nameserviceId, List<String> nnIds,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs,
DataNode dn) {
@ -135,12 +135,13 @@ void writeUnlock() {
this.dn = dn;
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
lifelineNnAddrs.get(i), this));
this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i),
nnAddrs.get(i), lifelineNnAddrs.get(i), this));
}
}
void refreshNNList(ArrayList<InetSocketAddress> addrs,
void refreshNNList(String serviceId, List<String> nnIds,
ArrayList<InetSocketAddress> addrs,
ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
for (BPServiceActor actor : bpServices) {
@ -151,7 +152,8 @@ void refreshNNList(ArrayList<InetSocketAddress> addrs,
// Process added NNs
Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
for (InetSocketAddress addedNN : addedNNs) {
BPServiceActor actor = new BPServiceActor(addedNN,
BPServiceActor actor = new BPServiceActor(serviceId,
nnIds.get(addrs.indexOf(addedNN)), addedNN,
lifelineAddrs.get(addrs.indexOf(addedNN)), this);
actor.start();
bpServices.add(actor);

View File

@ -100,6 +100,8 @@ enum RunningState {
CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
}
private String serviceId = null;
private String nnId = null;
private volatile RunningState runningState = RunningState.CONNECTING;
private volatile boolean shouldServiceRun = true;
private final DataNode dn;
@ -115,8 +117,8 @@ enum RunningState {
final LinkedList<BPServiceActorAction> bpThreadQueue
= new LinkedList<BPServiceActorAction>();
BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
BPOfferService bpos) {
BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
this.bpos = bpos;
this.dn = bpos.getDataNode();
this.nnAddr = nnAddr;
@ -134,6 +136,12 @@ enum RunningState {
dnConf.outliersReportIntervalMs);
// get the value of maxDataLength.
this.maxDataLength = dnConf.getMaxDataLength();
if (serviceId != null) {
this.serviceId = serviceId;
}
if (nnId != null) {
this.nnId = nnId;
}
}
public DatanodeRegistration getBpRegistration() {
@ -354,7 +362,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId());
bpos.getBlockPoolId(), getRpcMetricSuffix());
long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@ -417,7 +425,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// Log the block report processing stats from Datanode perspective
long brSendCost = monotonicNow() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
dn.getMetrics().addBlockReport(brSendCost, getRpcMetricSuffix());
final int nCmds = cmds.size();
LOG.info((success ? "S" : "Uns") +
"uccessfully sent block report 0x" +
@ -439,6 +447,18 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
return cmds.size() == 0 ? null : cmds;
}
private String getRpcMetricSuffix() {
if (serviceId == null && nnId == null) {
return null;
} else if (serviceId == null && nnId != null) {
return nnId;
} else if (serviceId != null && nnId == null) {
return serviceId;
} else {
return serviceId + "-" + nnId;
}
}
DatanodeCommand cacheReport() throws IOException {
// If caching is disabled, do not send a cache report
if (dn.getFSDataset().getCacheCapacity() == 0) {
@ -657,7 +677,8 @@ private void offerService() throws Exception {
}
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
getRpcMetricSuffix());
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
@ -687,7 +708,7 @@ private void offerService() throws Exception {
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately()|| sendHeartbeat)) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId());
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
List<DatanodeCommand> cmds = null;
@ -709,7 +730,7 @@ private void offerService() throws Exception {
if (sendHeartbeat) {
dn.getMetrics().addHeartbeatTotal(
scheduler.monotonicNow() - startTime);
scheduler.monotonicNow() - startTime, getRpcMetricSuffix());
}
// There is no work to do; sleep until hearbeat timer elapses,
@ -1059,7 +1080,8 @@ private void sendLifelineIfDue() throws IOException {
return;
}
sendLifeline();
dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime,
getRpcMetricSuffix());
scheduler.scheduleNextLifeline(scheduler.monotonicNow());
}

View File

@ -216,14 +216,18 @@ private void doRefreshNamenodes(
lifelineAddrMap.get(nsToAdd);
ArrayList<InetSocketAddress> addrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<String> nnIds =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
nnIds.add(nnId);
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
}
BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
lifelineAddrs);
bpByNameserviceId.put(nsToAdd, bpos);
offerServices.add(bpos);
}
@ -260,17 +264,20 @@ private void doRefreshNamenodes(
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<InetSocketAddress> lifelineAddrs =
Lists.newArrayListWithCapacity(nnIdToAddr.size());
ArrayList<String> nnIds = Lists.newArrayListWithCapacity(
nnIdToAddr.size());
for (String nnId : nnIdToAddr.keySet()) {
addrs.add(nnIdToAddr.get(nnId));
lifelineAddrs.add(nnIdToLifelineAddr != null ?
nnIdToLifelineAddr.get(nnId) : null);
nnIds.add(nnId);
}
try {
UserGroupInformation.getLoginUser()
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
bpos.refreshNNList(addrs, lifelineAddrs);
bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
return null;
}
});
@ -288,8 +295,10 @@ public Object run() throws Exception {
*/
protected BPOfferService createBPOS(
final String nameserviceId,
List<String> nnIds,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
dn);
}
}

View File

@ -194,7 +194,7 @@ private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
/** Send IBRs to namenode. */
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
String bpid) throws IOException {
String bpid, String nnRpcLatencySuffix) throws IOException {
// Generate a list of the pending reports for each storage under the lock
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
if (reports.length == 0) {
@ -214,7 +214,8 @@ void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
} finally {
if (success) {
dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime,
nnRpcLatencySuffix);
lastIBR = startTime;
} else {
// If we didn't succeed in sending the report, put all of the

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import java.util.concurrent.ThreadLocalRandom;
@ -161,6 +162,10 @@ public class DataNodeMetrics {
private MutableCounterLong ecReconstructionWriteTimeMillis;
final MetricsRegistry registry = new MetricsRegistry("datanode");
@Metric("Milliseconds spent on calling NN rpc")
private MutableRatesWithAggregation
nnRpcLatency = registry.newRatesWithAggregation("nnRpcLatency");
final String name;
JvmMetrics jvmMetrics = null;
private DataNodeUsageReportUtil dnUsageReportUtil;
@ -232,25 +237,41 @@ public static DataNodeMetrics create(Configuration conf, String dnName) {
public JvmMetrics getJvmMetrics() {
return jvmMetrics;
}
public void addHeartbeat(long latency) {
public void addHeartbeat(long latency, String rpcMetricSuffix) {
heartbeats.add(latency);
if (rpcMetricSuffix != null) {
nnRpcLatency.add("HeartbeatsFor" + rpcMetricSuffix, latency);
}
}
public void addHeartbeatTotal(long latency) {
public void addHeartbeatTotal(long latency, String rpcMetricSuffix) {
heartbeatsTotal.add(latency);
if (rpcMetricSuffix != null) {
nnRpcLatency.add("HeartbeatsTotalFor" + rpcMetricSuffix, latency);
}
}
public void addLifeline(long latency) {
public void addLifeline(long latency, String rpcMetricSuffix) {
lifelines.add(latency);
if (rpcMetricSuffix != null) {
nnRpcLatency.add("LifelinesFor" + rpcMetricSuffix, latency);
}
}
public void addBlockReport(long latency) {
public void addBlockReport(long latency, String rpcMetricSuffix) {
blockReports.add(latency);
if (rpcMetricSuffix != null) {
nnRpcLatency.add("BlockReportsFor" + rpcMetricSuffix, latency);
}
}
public void addIncrementalBlockReport(long latency) {
public void addIncrementalBlockReport(long latency,
String rpcMetricSuffix) {
incrementalBlockReports.add(latency);
if (rpcMetricSuffix != null) {
nnRpcLatency.add("IncrementalBlockReportsFor" + rpcMetricSuffix, latency);
}
}
public void addCacheReport(long latency) {

View File

@ -436,13 +436,16 @@ private BPOfferService setupBPOSForNNs(DataNode mockDn,
// function to return the corresponding proxies.
final Map<InetSocketAddress, DatanodeProtocolClientSideTranslatorPB> nnMap = Maps.newLinkedHashMap();
List<String> nnIds = Lists.newArrayListWithCapacity(nns.length);
for (int port = 0; port < nns.length; port++) {
nnMap.put(new InetSocketAddress(port), nns[port]);
Mockito.doReturn(nns[port]).when(mockDn).connectToNN(
Mockito.eq(new InetSocketAddress(port)));
nnIds.add("nn" + port);
}
return new BPOfferService("test_ns", Lists.newArrayList(nnMap.keySet()),
return new BPOfferService("test_ns", nnIds,
Lists.newArrayList(nnMap.keySet()),
Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
}
@ -912,7 +915,12 @@ public void testRefreshNameNodes() throws Exception {
addrs.add(new InetSocketAddress(2));
lifelineAddrs.add(null);
bpos.refreshNNList(addrs, lifelineAddrs);
ArrayList<String> nnIds = new ArrayList<>(addrs.size());
for (int i = 0; i < addrs.size(); i++) {
nnIds.add("nn" + i);
}
bpos.refreshNNList("serviceId", nnIds, addrs, lifelineAddrs);
assertEquals(2, bpos.getBPServiceActors().size());
// wait for handshake to run

View File

@ -54,6 +54,7 @@ public void setupBPM() {
@Override
protected BPOfferService createBPOS(
final String nameserviceId,
List<String> nnIds,
List<InetSocketAddress> nnAddrs,
List<InetSocketAddress> lifelineNnAddrs) {
final int idx = mockIdx++;
@ -69,7 +70,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
doLog("refresh #" + idx);
return null;
}
}).when(bpos).refreshNNList(
}).when(bpos).refreshNNList(Mockito.anyString(),
Mockito.<List<String>>any(),
Mockito.<ArrayList<InetSocketAddress>>any(),
Mockito.<ArrayList<InetSocketAddress>>any());
} catch (IOException e) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
@ -27,6 +28,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
@ -35,6 +37,7 @@
import com.google.common.collect.Lists;
import net.jcip.annotations.NotThreadSafe;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -428,4 +431,73 @@ public Boolean get() {
}
}, 1000, 6000);
}
@Test
public void testNNRpcMetricsWithNonHA() throws IOException {
Configuration conf = new HdfsConfiguration();
// setting heartbeat interval to 1 hour to prevent bpServiceActor sends
// heartbeat periodically to NN during running test case, and bpServiceActor
// only sends heartbeat once after startup
conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsNumOps", 1L, rb);
}
@Test
public void testNNRpcMetricsWithHA() throws IOException {
Configuration conf = new HdfsConfiguration();
// setting heartbeat interval to 1 hour to prevent bpServiceActor sends
// heartbeat periodically to NN during running test case, and bpServiceActor
// only sends heartbeat once after startup
conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHATopology()).build();
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
cluster.transitionToActive(0);
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsForminidfs-ns-nn1NumOps", 1L, rb);
assertCounter("HeartbeatsForminidfs-ns-nn2NumOps", 1L, rb);
assertCounter("HeartbeatsNumOps", 2L, rb);
}
@Test
public void testNNRpcMetricsWithFederation() throws IOException {
Configuration conf = new HdfsConfiguration();
// setting heartbeat interval to 1 hour to prevent bpServiceActor sends
// heartbeat periodically to NN during running test case, and bpServiceActor
// only sends heartbeat once after startup
conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleFederatedTopology("ns1,ns2")).build();
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsForns1NumOps", 1L, rb);
assertCounter("HeartbeatsForns2NumOps", 1L, rb);
assertCounter("HeartbeatsNumOps", 2L, rb);
}
@Test
public void testNNRpcMetricsWithFederationAndHA() throws IOException {
Configuration conf = new HdfsConfiguration();
// setting heartbeat interval to 1 hour to prevent bpServiceActor sends
// heartbeat periodically to NN during running test case, and bpServiceActor
// only sends heartbeat once after startup
conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
MiniDFSNNTopology.simpleHAFederatedTopology(2)).build();
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsForns0-nn0NumOps", 1L, rb);
assertCounter("HeartbeatsForns0-nn1NumOps", 1L, rb);
assertCounter("HeartbeatsForns1-nn0NumOps", 1L, rb);
assertCounter("HeartbeatsForns1-nn1NumOps", 1L, rb);
assertCounter("HeartbeatsNumOps", 4L, rb);
}
}

View File

@ -62,7 +62,7 @@ public void setUp() throws IOException {
BPOfferService mockBPOS = mock(BPOfferService.class);
doReturn(mockDN).when(mockBPOS).getDataNode();
actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS);
actor = new BPServiceActor("test", "test", INVALID_ADDR, null, mockBPOS);
fakeNsInfo = mock(NamespaceInfo.class);
// Return a a good software version.