HDFS-11534. Add counters for number of blocks in pending IBR. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
d1b7439b48
commit
1168ece596
@ -126,7 +126,9 @@ static enum RunningState {
|
||||
this.initialRegistrationComplete = lifelineNnAddr != null ?
|
||||
new CountDownLatch(1) : null;
|
||||
this.dnConf = dn.getDnConf();
|
||||
this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval);
|
||||
this.ibrManager = new IncrementalBlockReportManager(
|
||||
dnConf.ibrInterval,
|
||||
dn.getMetrics());
|
||||
prevBlockReportId = ThreadLocalRandom.current().nextLong();
|
||||
scheduler = new Scheduler(dnConf.heartBeatInterval,
|
||||
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval,
|
||||
@ -350,7 +352,7 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
|
||||
// or we will report an RBW replica after the BlockReport already reports
|
||||
// a FINALIZED one.
|
||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||
bpos.getBlockPoolId(), dn.getMetrics());
|
||||
bpos.getBlockPoolId());
|
||||
|
||||
long brCreateStartTime = monotonicNow();
|
||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||
@ -678,7 +680,7 @@ private void offerService() throws Exception {
|
||||
}
|
||||
if (ibrManager.sendImmediately() || sendHeartbeat) {
|
||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||
bpos.getBlockPoolId(), dn.getMetrics());
|
||||
bpos.getBlockPoolId());
|
||||
}
|
||||
|
||||
List<DatanodeCommand> cmds = null;
|
||||
|
@ -52,6 +52,11 @@ private static class PerStorageIBR {
|
||||
/** The blocks in this IBR. */
|
||||
final Map<Block, ReceivedDeletedBlockInfo> blocks = Maps.newHashMap();
|
||||
|
||||
private DataNodeMetrics dnMetrics;
|
||||
PerStorageIBR(final DataNodeMetrics dnMetrics) {
|
||||
this.dnMetrics = dnMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the given block from this IBR
|
||||
* @return true if the block was removed; otherwise, return false.
|
||||
@ -76,6 +81,25 @@ ReceivedDeletedBlockInfo[] removeAll() {
|
||||
/** Put the block to this IBR. */
|
||||
void put(ReceivedDeletedBlockInfo rdbi) {
|
||||
blocks.put(rdbi.getBlock(), rdbi);
|
||||
increaseBlocksCounter(rdbi);
|
||||
}
|
||||
|
||||
private void increaseBlocksCounter(
|
||||
final ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
|
||||
switch (receivedDeletedBlockInfo.getStatus()) {
|
||||
case RECEIVING_BLOCK:
|
||||
dnMetrics.incrBlocksReceivingInPendingIBR();
|
||||
break;
|
||||
case RECEIVED_BLOCK:
|
||||
dnMetrics.incrBlocksReceivedInPendingIBR();
|
||||
break;
|
||||
case DELETED_BLOCK:
|
||||
dnMetrics.incrBlocksDeletedInPendingIBR();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
dnMetrics.incrBlocksInPendingIBR();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -114,10 +138,14 @@ int putMissing(ReceivedDeletedBlockInfo[] rdbis) {
|
||||
|
||||
/** The timestamp of the last IBR. */
|
||||
private volatile long lastIBR;
|
||||
private DataNodeMetrics dnMetrics;
|
||||
|
||||
IncrementalBlockReportManager(final long ibrInterval) {
|
||||
IncrementalBlockReportManager(
|
||||
final long ibrInterval,
|
||||
final DataNodeMetrics dnMetrics) {
|
||||
this.ibrInterval = ibrInterval;
|
||||
this.lastIBR = monotonicNow() - ibrInterval;
|
||||
this.dnMetrics = dnMetrics;
|
||||
}
|
||||
|
||||
boolean sendImmediately() {
|
||||
@ -147,6 +175,10 @@ private synchronized StorageReceivedDeletedBlocks[] generateIBRs() {
|
||||
reports.add(new StorageReceivedDeletedBlocks(entry.getKey(), rdbi));
|
||||
}
|
||||
}
|
||||
|
||||
/* set blocks to zero */
|
||||
this.dnMetrics.resetBlocksInPendingIBR();
|
||||
|
||||
readyToSend = false;
|
||||
return reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]);
|
||||
}
|
||||
@ -162,7 +194,7 @@ private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
|
||||
|
||||
/** Send IBRs to namenode. */
|
||||
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
|
||||
String bpid, DataNodeMetrics metrics) throws IOException {
|
||||
String bpid) throws IOException {
|
||||
// Generate a list of the pending reports for each storage under the lock
|
||||
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
|
||||
if (reports.length == 0) {
|
||||
@ -180,8 +212,9 @@ void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
|
||||
namenode.blockReceivedAndDeleted(registration, bpid, reports);
|
||||
success = true;
|
||||
} finally {
|
||||
metrics.addIncrementalBlockReport(monotonicNow() - startTime);
|
||||
|
||||
if (success) {
|
||||
dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
|
||||
lastIBR = startTime;
|
||||
} else {
|
||||
// If we didn't succeed in sending the report, put all of the
|
||||
@ -199,7 +232,7 @@ private PerStorageIBR getPerStorageIBR(DatanodeStorage storage) {
|
||||
// This is the first time we are adding incremental BR state for
|
||||
// this storage so create a new map. This is required once per
|
||||
// storage, per service actor.
|
||||
perStorage = new PerStorageIBR();
|
||||
perStorage = new PerStorageIBR(dnMetrics);
|
||||
pendingIBRs.put(storage, perStorage);
|
||||
}
|
||||
return perStorage;
|
||||
|
@ -31,6 +31,7 @@
|
||||
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
@ -130,6 +131,14 @@ public class DataNodeMetrics {
|
||||
@Metric MutableRate sendDataPacketTransferNanos;
|
||||
final MutableQuantiles[] sendDataPacketTransferNanosQuantiles;
|
||||
|
||||
@Metric("Count of blocks in pending IBR")
|
||||
private MutableGaugeLong blocksInPendingIBR;
|
||||
@Metric("Count of blocks at receiving status in pending IBR")
|
||||
private MutableGaugeLong blocksReceivingInPendingIBR;
|
||||
@Metric("Count of blocks at received status in pending IBR")
|
||||
private MutableGaugeLong blocksReceivedInPendingIBR;
|
||||
@Metric("Count of blocks at deleted status in pending IBR")
|
||||
private MutableGaugeLong blocksDeletedInPendingIBR;
|
||||
@Metric("Count of erasure coding reconstruction tasks")
|
||||
MutableCounterLong ecReconstructionTasks;
|
||||
@Metric("Count of erasure coding failed reconstruction tasks")
|
||||
@ -433,6 +442,32 @@ public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets blocks in pending IBR to zero.
|
||||
*/
|
||||
public void resetBlocksInPendingIBR() {
|
||||
blocksInPendingIBR.set(0);
|
||||
blocksReceivingInPendingIBR.set(0);
|
||||
blocksReceivedInPendingIBR.set(0);
|
||||
blocksDeletedInPendingIBR.set(0);
|
||||
}
|
||||
|
||||
public void incrBlocksInPendingIBR() {
|
||||
blocksInPendingIBR.incr();
|
||||
}
|
||||
|
||||
public void incrBlocksReceivingInPendingIBR() {
|
||||
blocksReceivingInPendingIBR.incr();
|
||||
}
|
||||
|
||||
public void incrBlocksReceivedInPendingIBR() {
|
||||
blocksReceivedInPendingIBR.incr();
|
||||
}
|
||||
|
||||
public void incrBlocksDeletedInPendingIBR() {
|
||||
blocksDeletedInPendingIBR.incr();
|
||||
}
|
||||
|
||||
public void incrECReconstructionTasks() {
|
||||
ecReconstructionTasks.incr();
|
||||
}
|
||||
|
@ -0,0 +1,146 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Test counters for number of blocks in pending IBR.
|
||||
*/
|
||||
public class TestBlockCountersInPendingIBR {
|
||||
|
||||
@Test
|
||||
public void testBlockCounters() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
|
||||
/*
|
||||
* Set a really long value for dfs.blockreport.intervalMsec and
|
||||
* dfs.heartbeat.interval, so that incremental block reports and heartbeats
|
||||
* won't be sent during this test unless they're triggered manually.
|
||||
*/
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10800000L);
|
||||
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L);
|
||||
|
||||
final MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
cluster.waitActive();
|
||||
final DatanodeProtocolClientSideTranslatorPB spy =
|
||||
InternalDataNodeTestUtils.spyOnBposToNN(
|
||||
cluster.getDataNodes().get(0), cluster.getNameNode());
|
||||
final DataNode datanode = cluster.getDataNodes().get(0);
|
||||
|
||||
/* We should get 0 incremental block report. */
|
||||
Mockito.verify(spy, timeout(60000).times(0)).blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
/*
|
||||
* Create fake blocks notification on the DataNode. This will be sent with
|
||||
* the next incremental block report.
|
||||
*/
|
||||
final BPServiceActor actor =
|
||||
datanode.getAllBpOs().get(0).getBPServiceActors().get(0);
|
||||
final FsDatasetSpi<?> dataset = datanode.getFSDataset();
|
||||
final DatanodeStorage storage;
|
||||
try (FsDatasetSpi.FsVolumeReferences volumes =
|
||||
dataset.getFsVolumeReferences()) {
|
||||
storage = dataset.getStorage(volumes.get(0).getStorageID());
|
||||
}
|
||||
|
||||
ReceivedDeletedBlockInfo rdbi = null;
|
||||
/* block at status of RECEIVING_BLOCK */
|
||||
rdbi = new ReceivedDeletedBlockInfo(
|
||||
new Block(5678, 512, 1000), BlockStatus.RECEIVING_BLOCK, null);
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
|
||||
/* block at status of RECEIVED_BLOCK */
|
||||
rdbi = new ReceivedDeletedBlockInfo(
|
||||
new Block(5679, 512, 1000), BlockStatus.RECEIVED_BLOCK, null);
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
|
||||
/* block at status of DELETED_BLOCK */
|
||||
rdbi = new ReceivedDeletedBlockInfo(
|
||||
new Block(5680, 512, 1000), BlockStatus.DELETED_BLOCK, null);
|
||||
actor.getIbrManager().addRDBI(rdbi, storage);
|
||||
|
||||
/* verify counters before sending IBR */
|
||||
verifyBlockCounters(datanode, 3, 1, 1, 1);
|
||||
|
||||
/* Manually trigger a block report. */
|
||||
datanode.triggerBlockReport(
|
||||
new BlockReportOptions.Factory().
|
||||
setIncremental(true).
|
||||
build()
|
||||
);
|
||||
|
||||
/*
|
||||
* triggerBlockReport returns before the block report is actually sent. Wait
|
||||
* for it to be sent here.
|
||||
*/
|
||||
Mockito.verify(spy, timeout(60000).times(1)).
|
||||
blockReceivedAndDeleted(
|
||||
any(DatanodeRegistration.class),
|
||||
anyString(),
|
||||
any(StorageReceivedDeletedBlocks[].class));
|
||||
|
||||
/* verify counters after sending IBR */
|
||||
verifyBlockCounters(datanode, 0, 0, 0, 0);
|
||||
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
|
||||
private void verifyBlockCounters(final DataNode datanode,
|
||||
final long blocksInPendingIBR, final long blocksReceivingInPendingIBR,
|
||||
final long blocksReceivedInPendingIBR,
|
||||
final long blocksDeletedInPendingIBR) {
|
||||
|
||||
final MetricsRecordBuilder m = MetricsAsserts
|
||||
.getMetrics(datanode.getMetrics().name());
|
||||
|
||||
MetricsAsserts.assertGauge("BlocksInPendingIBR",
|
||||
blocksInPendingIBR, m);
|
||||
MetricsAsserts.assertGauge("BlocksReceivingInPendingIBR",
|
||||
blocksReceivingInPendingIBR, m);
|
||||
MetricsAsserts.assertGauge("BlocksReceivedInPendingIBR",
|
||||
blocksReceivedInPendingIBR, m);
|
||||
MetricsAsserts.assertGauge("BlocksDeletedInPendingIBR",
|
||||
blocksDeletedInPendingIBR, m);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user