From 4bd873b816dbd889f410428d6e618586d4ff1780 Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Wed, 28 Jun 2023 12:03:15 +0800 Subject: [PATCH] HDFS-17044. Set size of non-exist block to NO_ACK when process FBR or IBR to avoid useless report from DataNode. (#5735). Contributed by Haiyang Hu. Signed-off-by: He Xiaoqiao --- .../server/blockmanagement/BlockManager.java | 7 +- .../datanode/metrics/DataNodeMetrics.java | 4 + .../blockmanagement/TestBlockManager.java | 80 +++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 16b79539fd..fab3619cb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3295,8 +3295,11 @@ private BlockInfo processReportedBlock( BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, - // the replica should be removed from the data-node. - toInvalidate.add(new Block(block)); + // The replica should be removed from Datanode, and set NumBytes to BlockCommand.No_ACK to + // avoid useless report to NameNode from Datanode when complete to process it. + Block invalidateBlock = new Block(block); + invalidateBlock.setNumBytes(BlockCommand.NO_ACK); + toInvalidate.add(invalidateBlock); return null; } BlockUCState ucState = storedBlock.getBlockUCState(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index c3aa3c3a45..da19f28711 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -346,6 +346,10 @@ public void incrBlocksRemoved(int delta) { blocksRemoved.incr(delta); } + public long getBlocksRemoved() { + return blocksRemoved.value(); + } + public void incrBytesWritten(int delta) { bytesWritten.incr(delta); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index a054511423..3e00491e99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -117,6 +117,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -2121,4 +2122,83 @@ public void testBlockReportAfterDataNodeRestart() throws Exception { assertEquals(2, locs[0].getHosts().length); } } + + /** + * Test processing toInvalidate in block reported, if the block not exists need + * to set the numBytes of the block to NO_ACK, + * the DataNode processing will not report incremental blocks. + */ + @Test(timeout = 360000) + public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 10); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build()) { + cluster.waitActive(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + DistributedFileSystem fs = cluster.getFileSystem(); + // Write file. + Path file = new Path("/test"); + DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L); + DFSTestUtil.waitReplication(fs, file, (short) 1); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, file).get(0); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(1, loc.length); + List datanodes = cluster.getDataNodes(); + assertEquals(1, datanodes.size()); + DataNode datanode = datanodes.get(0); + assertEquals(datanode.getDatanodeUuid(), loc[0].getDatanodeUuid()); + + MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); + // Check the IncrementalBlockReportsNumOps of DataNode, it will be 0. + assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); + + // Delete file and remove block. + fs.delete(file, false); + + // Wait for the processing of the marked deleted block to complete. + BlockManagerTestUtil.waitForMarkedDeleteQueueIsEmpty(blockManager); + assertNull(blockManager.getStoredBlock(lb.getBlock().getLocalBlock())); + + // Expire heartbeat on the NameNode,and datanode to be marked dead. + datanode.setHeartbeatsDisabledForTests(true); + cluster.setDataNodeDead(datanode.getDatanodeId()); + assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock())); + + // Wait for re-registration and heartbeat. + datanode.setHeartbeatsDisabledForTests(false); + final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0) + .getBlockManager().getDatanodeManager() + .getDatanode(datanode.getDatanodeId()); + GenericTestUtils.waitFor( + () -> dn1Desc.isAlive() && dn1Desc.isHeartbeatedSinceRegistration(), + 100, 5000); + + // Trigger BlockReports and block is not exists, + // it will add invalidateBlocks and set block numBytes be NO_ACK. + cluster.triggerBlockReports(); + GenericTestUtils.waitFor( + () -> blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()), + 100, 1000); + + // Trigger schedule blocks for deletion at datanode. + int workCount = blockManager.computeInvalidateWork(1); + assertEquals(1, workCount); + assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock())); + + // Wait for the blocksRemoved value in DataNode to be 1. + GenericTestUtils.waitFor( + () -> datanode.getMetrics().getBlocksRemoved() == 1, + 100, 5000); + + // Trigger immediate deletion report at datanode. + cluster.triggerDeletionReports(); + + // Delete block numBytes be NO_ACK and will not deletion block report, + // so check the IncrementalBlockReportsNumOps of DataNode still 1. + assertEquals(1, getLongCounter("IncrementalBlockReportsNumOps", rb)); + } + } } \ No newline at end of file