From 0c35cf0982c2118fdb683adf1a224b2f1f187117 Mon Sep 17 00:00:00 2001 From: dannytbecker <43830149+dannytbecker@users.noreply.github.com> Date: Thu, 18 Apr 2024 09:04:08 -0700 Subject: [PATCH] HDFS-17477. IncrementalBlockReport race condition additional edge cases (#6748) --- .../server/blockmanagement/BlockManager.java | 14 +- .../PendingDataNodeMessages.java | 20 +- .../TestPendingDataNodeMessages.java | 42 +++- .../datanode/TestIncrementalBlockReports.java | 202 +++++++++++++++++- 4 files changed, 267 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index f0c88f3755..8f65673806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3263,6 +3263,7 @@ void processFirstBlockReport( for (BlockReportReplica iblk : report) { ReplicaState reportedState = iblk.getState(); + removeQueuedBlock(storageInfo, iblk); if (LOG.isDebugEnabled()) { LOG.debug("Initial report of block {} on {} size {} replicaState = {}", iblk.getBlockName(), storageInfo.getDatanodeDescriptor(), @@ -3348,6 +3349,7 @@ private void reportDiff(DatanodeStorageInfo storageInfo, // scan the report and process newly reported blocks for (BlockReportReplica iblk : newReport) { ReplicaState iState = iblk.getState(); + removeQueuedBlock(storageInfo, iblk); LOG.debug("Reported block {} on {} size {} replicaState = {}", iblk, dn, iblk.getNumBytes(), iState); BlockInfo storedBlock = processReportedBlock(storageInfo, @@ -3416,7 +3418,6 @@ private BlockInfo processReportedBlock( LOG.debug("Reported block {} on {} size {} replicaState = {}", block, dn, block.getNumBytes(), reportedState); - if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) { queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); @@ -3496,6 +3497,16 @@ private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block, pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState); } + /** + * Queue the given reported block for later processing in the + * standby node. @see PendingDataNodeMessages. + */ + private void removeQueuedBlock(DatanodeStorageInfo storageInfo, Block block) { + LOG.debug("Removing queued block {} from datanode {} from pending queue.", + block, storageInfo.getDatanodeDescriptor()); + pendingDNMessages.removeQueuedBlock(storageInfo, block); + } + /** * Try to process any messages that were previously queued for the given * block. This is called from FSEditLogLoader whenever a block's state @@ -4558,6 +4569,7 @@ private boolean processAndHandleReportedBlock( final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + removeQueuedBlock(storageInfo, block); LOG.debug("Reported block {} on {} size {} replicaState = {}", block, node, block.getNumBytes(), reportedState); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java index 5da5429575..cfbb4f0dfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -95,11 +95,24 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) { void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { + if (BlockIdManager.isStripedBlockID(block.getBlockId())) { + Block blkId = new Block(BlockIdManager.convertToStripedID(block + .getBlockId())); + getBlockQueue(blkId).add( + new ReportedBlockInfo(storageInfo, new Block(block), reportedState)); + } else { + block = new Block(block); + getBlockQueue(block).add( + new ReportedBlockInfo(storageInfo, block, reportedState)); + } + count++; + } + + void removeQueuedBlock(DatanodeStorageInfo storageInfo, Block block) { if (storageInfo == null || block == null) { return; } block = new Block(block); - long genStamp = block.getGenerationStamp(); Queue queue = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { Block blkId = new Block(BlockIdManager.convertToStripedID(block @@ -114,12 +127,9 @@ void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, // the old reported block will be processed and marked as corrupt by the ANN. // See HDFS-17453 int size = queue.size(); - if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) && - rbi.block.getGenerationStamp() <= genStamp)) { + if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo))) { count -= (size - queue.size()); } - queue.add(new ReportedBlockInfo(storageInfo, block, reportedState)); - count++; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java index d0fe4542dd..9f3a8d1252 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java @@ -67,14 +67,16 @@ public void testQueues() { msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED); msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED); List rbis = Arrays.asList( + new ReportedBlockInfo(storageInfo1, block1Gs1, ReplicaState.FINALIZED), + new ReportedBlockInfo(storageInfo2, block1Gs1, ReplicaState.FINALIZED), new ReportedBlockInfo(storageInfo1, block1Gs2, ReplicaState.FINALIZED), new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED)); - assertEquals(2, msgs.count()); + assertEquals(4, msgs.count()); // Nothing queued yet for block 2 assertNull(msgs.takeBlockQueue(block2Gs1)); - assertEquals(2, msgs.count()); + assertEquals(4, msgs.count()); Queue q = msgs.takeBlockQueue(block1Gs2DifferentInstance); @@ -123,4 +125,40 @@ public void testPendingDataNodeMessagesWithEC() throws Exception { cluster.shutdown(); } } + + @Test + public void testRemoveQueuedBlock() { + DatanodeDescriptor fakeDN1 = DFSTestUtil.getDatanodeDescriptor( + "localhost", 8898, "/default-rack"); + DatanodeDescriptor fakeDN2 = DFSTestUtil.getDatanodeDescriptor( + "localhost", 8899, "/default-rack"); + DatanodeStorage storage1 = new DatanodeStorage("STORAGE_ID_1"); + DatanodeStorage storage2 = new DatanodeStorage("STORAGE_ID_2"); + DatanodeStorageInfo storageInfo1 = new DatanodeStorageInfo(fakeDN1, storage1); + DatanodeStorageInfo storageInfo2 = new DatanodeStorageInfo(fakeDN2, storage2); + msgs.enqueueReportedBlock(storageInfo1, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo2, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED); + List rbis = Arrays.asList( + new ReportedBlockInfo(storageInfo2, block1Gs1, ReplicaState.FINALIZED), + new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED)); + + assertEquals(4, msgs.count()); + + // Nothing queued yet for block 2 + assertNull(msgs.takeBlockQueue(block2Gs1)); + assertEquals(4, msgs.count()); + + msgs.removeQueuedBlock(storageInfo1, block1Gs1); + Queue q = + msgs.takeBlockQueue(block1Gs2DifferentInstance); + assertEquals(Joiner.on(",").join(rbis), + Joiner.on(",").join(q)); + assertEquals(0, msgs.count()); + + // Should be null if we pull again; + assertNull(msgs.takeBlockQueue(block1Gs2)); + assertEquals(0, msgs.count()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 998c49452b..6551de4e79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -27,14 +27,18 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.mockito.invocation.InvocationOnMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -246,6 +250,7 @@ public void testIBRRaceCondition() throws Exception { NameNode nn1 = cluster.getNameNode(0); NameNode nn2 = cluster.getNameNode(1); + BlockManager bm2 = nn2.getNamesystem().getBlockManager(); FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); List ibrsToStandby = new ArrayList<>(); List spies = new ArrayList<>(); @@ -262,7 +267,6 @@ public void testIBRRaceCondition() throws Exception { } } } - ibrsToStandby.add(inv); return null; }).when(nnSpy).blockReceivedAndDeleted( any(DatanodeRegistration.class), @@ -289,8 +293,9 @@ public void testIBRRaceCondition() throws Exception { } } - assertEquals("There should be 3 pending messages from DNs", 3, - nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount()); + GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, + 1000, 30000, + "There should be 0 pending DN messages"); ibrsToStandby.clear(); // We need to trigger another edit log roll so that the pendingDNMessages // are processed. @@ -308,6 +313,92 @@ public void testIBRRaceCondition() throws Exception { } ibrsToStandby.clear(); ibrPhaser.arriveAndDeregister(); + GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, + 1000, 30000, + "There should be 0 pending DN messages"); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + LOG.info("=================================="); + + // Trigger an active switch to force SNN to mark blocks as corrupt if they + // have a bad genstamp in the pendingDNMessages queue. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + + assertEquals("There should not be any corrupt replicas", 0, + nn2.getNamesystem().getBlockManager() + .numCorruptReplicas(block.getLocalBlock())); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testIBRRaceCondition2() throws Exception { + cluster.shutdown(); + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + BlockManager bm2 = nn2.getNamesystem().getBlockManager(); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + List ibrsToStandby = new ArrayList<>(); + List spies = new ArrayList<>(); + Phaser ibrPhaser = new Phaser(1); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); + doAnswer((inv) -> { + for (StorageReceivedDeletedBlocks srdb : + inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { + for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { + if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { + ibrsToStandby.add(inv); + ibrPhaser.arriveAndDeregister(); + } + } + } + return null; + }).when(nnSpy).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + spies.add(nnSpy); + } + + LOG.info("=================================="); + // Force the DNs to delay report to the SNN + ibrPhaser.bulkRegister(9); + DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + // SNN has caught up to the latest edit log so we send the IBRs to SNN + int phase = ibrPhaser.arrive(); + ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); + for (InvocationOnMock sendIBRs : ibrsToStandby) { + try { + sendIBRs.callRealMethod(); + } catch (Throwable t) { + LOG.error("Exception thrown while calling sendIBRs: ", t); + } + } + + GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, + 1000, 30000, + "There should be 0 pending DN messages"); + ibrsToStandby.clear(); + ibrPhaser.arriveAndDeregister(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH); HATestUtil.waitForStandbyToCatchUp(nn1, nn2); LOG.info("=================================="); @@ -325,4 +416,109 @@ public void testIBRRaceCondition() throws Exception { cluster.shutdown(); } } + + @Test + public void testIBRRaceCondition3() throws Exception { + cluster.shutdown(); + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + BlockManager bm2 = nn2.getNamesystem().getBlockManager(); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + LinkedHashMap> ibrsToStandby = + new LinkedHashMap<>(); + AtomicLong lowestGenStamp = new AtomicLong(Long.MAX_VALUE); + List spies = new ArrayList<>(); + Phaser ibrPhaser = new Phaser(1); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); + doAnswer((inv) -> { + for (StorageReceivedDeletedBlocks srdb : + inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { + for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { + if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { + long genStamp = block.getBlock().getGenerationStamp(); + ibrsToStandby.putIfAbsent(genStamp, new ArrayList<>()); + ibrsToStandby.get(genStamp).add(inv); + lowestGenStamp.getAndUpdate((prev) -> Math.min(prev, genStamp)); + ibrPhaser.arriveAndDeregister(); + } + } + } + return null; + }).when(nnSpy).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + spies.add(nnSpy); + } + + LOG.info("=================================="); + // Force the DNs to delay report to the SNN + ibrPhaser.bulkRegister(9); + DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + // SNN has caught up to the latest edit log so we send the IBRs to SNN + int phase = ibrPhaser.arrive(); + ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); + ibrsToStandby.forEach((genStamp, ibrs) -> { + if (lowestGenStamp.get() != genStamp) { + ibrs.removeIf(inv -> { + try { + inv.callRealMethod(); + } catch (Throwable t) { + LOG.error("Exception thrown while calling sendIBRs: ", t); + } + return true; + }); + } + }); + + GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 0, + 1000, 30000, + "There should be 0 pending DN messages"); + ibrPhaser.arriveAndDeregister(); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + + // Send old ibrs to simulate actual stale or corrupt DNs + for (InvocationOnMock sendIBR : ibrsToStandby.get(lowestGenStamp.get())) { + try { + sendIBR.callRealMethod(); + } catch (Throwable t) { + LOG.error("Exception thrown while calling sendIBRs: ", t); + } + } + + GenericTestUtils.waitFor(() -> bm2.getPendingDataNodeMessageCount() == 3, + 1000, 30000, + "There should be 0 pending DN messages"); + LOG.info("=================================="); + + // Trigger an active switch to force SNN to mark blocks as corrupt if they + // have a bad genstamp in the pendingDNMessages queue. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + + assertEquals("There should be 1 corrupt replica", 1, + nn2.getNamesystem().getBlockManager() + .numCorruptReplicas(block.getLocalBlock())); + } finally { + cluster.shutdown(); + } + } }