HDFS-16402. Improve HeartbeatManager logic to avoid incorrect stats. (#3839). Contributed by tomscut.

Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
litao 2022-01-24 14:26:30 +08:00 committed by GitHub
parent dae33cf935
commit 15b820c83c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 11 deletions

View File

@ -2691,7 +2691,7 @@ public class BlockManager implements BlockStatsMXBean {
void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
BlockManagerFaultInjector.getInstance().mockAnException();
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}
@ -2703,6 +2703,7 @@ public class BlockManager implements BlockStatsMXBean {
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
BlockManagerFaultInjector.getInstance().mockAnException();
for (StorageReport report: reports) {
providedStorageMap.updateStorage(node, report.getStorage());
}

View File

@ -49,4 +49,8 @@ public class BlockManagerFaultInjector {
@VisibleForTesting
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
}
@VisibleForTesting
public void mockAnException() {
}
}

View File

@ -256,9 +256,12 @@ class HeartbeatManager implements DatanodeStatistics {
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
try {
blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
} finally {
stats.add(node);
}
}
synchronized void updateLifeline(final DatanodeDescriptor node,
@ -266,13 +269,16 @@ class HeartbeatManager implements DatanodeStatistics {
int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) {
stats.subtract(node);
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
stats.add(node);
try {
// This intentionally calls updateHeartbeatState instead of
// updateHeartbeat, because we don't want to modify the
// heartbeatedSinceRegistration flag. Arrival of a lifeline message does
// not count as arrival of the first heartbeat.
blockManager.updateHeartbeatState(node, reports, cacheCapacity, cacheUsed,
xceiverCount, failedVolumes, volumeFailureSummary);
} finally {
stats.add(node);
}
}
synchronized void startDecommission(final DatanodeDescriptor node) {

View File

@ -3943,4 +3943,8 @@ public class DataNode extends ReconfigurableBase
boolean isSlownode() {
return blockPoolManager.isSlownode();
}
BlockPoolManager getBlockPoolManager() {
return blockPoolManager;
}
}

View File

@ -25,6 +25,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@ -95,6 +98,12 @@ public class TestDataNodeLifeline {
private FSNamesystem namesystem;
private DataNode dn;
private BPServiceActor bpsa;
private final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
@Override
public void mockAnException() {
throw new UnknownError("Unknown exception");
}
};
@Before
public void setup() throws Exception {
@ -336,4 +345,49 @@ public class TestDataNodeLifeline {
return result;
}
}
/**
* Mock an exception in HeartbeatManager#updateHeartbeat and HeartbeatManager#updateLifeline
* respectively, and trigger the heartbeat and lifeline in sequence. The capacityTotal obtained
* before and after this operation should be the same.
* @throws Exception
*/
@Test
public void testHeartbeatAndLifelineOnError() throws Exception {
final Configuration config = new HdfsConfiguration();
config.set(DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, "0.0.0.0:0");
try(MiniDFSCluster cluster =
new MiniDFSCluster.Builder(config).numDataNodes(1).build()) {
cluster.waitActive();
final FSNamesystem fsNamesystem = cluster.getNamesystem();
// Get capacityTotal before triggering heartbeat and lifeline.
DatanodeStatistics datanodeStatistics =
fsNamesystem.getBlockManager().getDatanodeManager().getDatanodeStatistics();
long capacityTotalBefore = datanodeStatistics.getCapacityTotal();
// Mock an exception in HeartbeatManager#updateHeartbeat and HeartbeatManager#updateLifeline.
BlockManagerFaultInjector.instance = injector;
DataNode dataNode = cluster.getDataNodes().get(0);
BlockPoolManager blockPoolManager = dataNode.getBlockPoolManager();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
try {
actor.triggerHeartbeatForTests();
actor.sendLifelineForTests();
} catch (Throwable e) {
assertTrue(e.getMessage().contains("Unknown exception"));
}
}
}
}
// Get capacityTotal after triggering heartbeat and lifeline.
long capacityTotalAfter = datanodeStatistics.getCapacityTotal();
// The capacityTotal should be same.
assertEquals(capacityTotalBefore, capacityTotalAfter);
}
}
}