diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java index 96e5617344..5da5429575 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java @@ -95,16 +95,30 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) { void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { + if (storageInfo == null || block == null) { + return; + } + block = new Block(block); + long genStamp = block.getGenerationStamp(); + Queue queue = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { Block blkId = new Block(BlockIdManager.convertToStripedID(block .getBlockId())); - getBlockQueue(blkId).add( - new ReportedBlockInfo(storageInfo, new Block(block), reportedState)); + queue = getBlockQueue(blkId); } else { - block = new Block(block); - getBlockQueue(block).add( - new ReportedBlockInfo(storageInfo, block, reportedState)); + queue = getBlockQueue(block); } + // We only want the latest non-future reported block to be queued for each + // DataNode. Otherwise, there can be a race condition that causes an old + // reported block to be kept in the queue until the SNN switches to ANN and + // the old reported block will be processed and marked as corrupt by the ANN. + // See HDFS-17453 + int size = queue.size(); + if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) && + rbi.block.getGenerationStamp() <= genStamp)) { + count -= (size - queue.size()); + } + queue.add(new ReportedBlockInfo(storageInfo, block, reportedState)); count++; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java index ebc073d53d..d0fe4542dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.util.Arrays; +import java.util.List; import java.util.Queue; import org.apache.hadoop.conf.Configuration; @@ -52,11 +54,21 @@ public class TestPendingDataNodeMessages { @Test public void testQueues() { - DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); - DatanodeStorage storage = new DatanodeStorage("STORAGE_ID"); - DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage); - msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED); - msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED); + DatanodeDescriptor fakeDN1 = DFSTestUtil.getDatanodeDescriptor( + "localhost", 8898, "/default-rack"); + DatanodeDescriptor fakeDN2 = DFSTestUtil.getDatanodeDescriptor( + "localhost", 8899, "/default-rack"); + DatanodeStorage storage1 = new DatanodeStorage("STORAGE_ID_1"); + DatanodeStorage storage2 = new DatanodeStorage("STORAGE_ID_2"); + DatanodeStorageInfo storageInfo1 = new DatanodeStorageInfo(fakeDN1, storage1); + DatanodeStorageInfo storageInfo2 = new DatanodeStorageInfo(fakeDN2, storage2); + msgs.enqueueReportedBlock(storageInfo1, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo2, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED); + List rbis = Arrays.asList( + new ReportedBlockInfo(storageInfo1, block1Gs2, ReplicaState.FINALIZED), + new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED)); assertEquals(2, msgs.count()); @@ -66,9 +78,7 @@ public void testQueues() { Queue q = msgs.takeBlockQueue(block1Gs2DifferentInstance); - assertEquals( - "ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:9866, reportedState=FINALIZED]," + - "ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:9866, reportedState=FINALIZED]", + assertEquals(Joiner.on(",").join(rbis), Joiner.on(",").join(q)); assertEquals(0, msgs.count()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index 4221ecaf2a..998c49452b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -17,14 +17,25 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.mockito.invocation.InvocationOnMock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -55,6 +66,9 @@ public class TestIncrementalBlockReports { private static final long DUMMY_BLOCK_ID = 5678; private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024; private static final long DUMMY_BLOCK_GENSTAMP = 1000; + private static final String TEST_FILE_DATA = "hello world"; + private static final String TEST_FILE = "/TestStandbyBlockManagement"; + private static final Path TEST_FILE_PATH = new Path(TEST_FILE); private MiniDFSCluster cluster = null; private Configuration conf; @@ -215,4 +229,100 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException cluster = null; } } + + @Test + public void testIBRRaceCondition() throws Exception { + cluster.shutdown(); + conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + List ibrsToStandby = new ArrayList<>(); + List spies = new ArrayList<>(); + Phaser ibrPhaser = new Phaser(1); + for (DataNode dn : cluster.getDataNodes()) { + DatanodeProtocolClientSideTranslatorPB nnSpy = + InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2); + doAnswer((inv) -> { + for (StorageReceivedDeletedBlocks srdb : + inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) { + for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) { + if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) { + ibrPhaser.arriveAndDeregister(); + } + } + } + ibrsToStandby.add(inv); + return null; + }).when(nnSpy).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + spies.add(nnSpy); + } + + LOG.info("=================================="); + // Force the DNs to delay report to the SNN + ibrPhaser.bulkRegister(9); + DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + // SNN has caught up to the latest edit log so we send the IBRs to SNN + int phase = ibrPhaser.arrive(); + ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); + for (InvocationOnMock sendIBRs : ibrsToStandby) { + try { + sendIBRs.callRealMethod(); + } catch (Throwable t) { + LOG.error("Exception thrown while calling sendIBRs: ", t); + } + } + + assertEquals("There should be 3 pending messages from DNs", 3, + nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount()); + ibrsToStandby.clear(); + // We need to trigger another edit log roll so that the pendingDNMessages + // are processed. + ibrPhaser.bulkRegister(6); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA); + phase = ibrPhaser.arrive(); + ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS); + for (InvocationOnMock sendIBRs : ibrsToStandby) { + try { + sendIBRs.callRealMethod(); + } catch (Throwable t) { + LOG.error("Exception thrown while calling sendIBRs: ", t); + } + } + ibrsToStandby.clear(); + ibrPhaser.arriveAndDeregister(); + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH); + HATestUtil.waitForStandbyToCatchUp(nn1, nn2); + LOG.info("=================================="); + + // Trigger an active switch to force SNN to mark blocks as corrupt if they + // have a bad genstamp in the pendingDNMessages queue. + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + cluster.waitActive(1); + + assertEquals("There should not be any corrupt replicas", 0, + nn2.getNamesystem().getBlockManager() + .numCorruptReplicas(block.getLocalBlock())); + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 44c3984d98..9d79e49610 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -421,9 +421,6 @@ public void testBlockReportsWhileFileBeingWritten() throws Exception { */ @Test public void testQueueingWithAppend() throws Exception { - int numQueued = 0; - int numDN = cluster.getDataNodes().size(); - // case 1: create file and call hflush after write FSDataOutputStream out = fs.create(TEST_FILE_PATH); try { @@ -436,20 +433,16 @@ public void testQueueingWithAppend() throws Exception { // Apply cluster.triggerBlockReports() to trigger the reporting sooner. // cluster.triggerBlockReports(); - numQueued += numDN; // RBW messages // The cluster.triggerBlockReports() call above does a full // block report that incurs 3 extra RBW messages - numQueued += numDN; // RBW messages } finally { IOUtils.closeStream(out); - numQueued += numDN; // blockReceived messages } cluster.triggerBlockReports(); - numQueued += numDN; - assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). - getPendingDataNodeMessageCount()); + assertEquals("The queue should only have the latest report for each DN", + 3, nn2.getNamesystem().getPendingDataNodeMessageCount()); // case 2: append to file and call hflush after write try { @@ -457,14 +450,12 @@ public void testQueueingWithAppend() throws Exception { AppendTestUtil.write(out, 10, 10); out.hflush(); cluster.triggerBlockReports(); - numQueued += numDN * 2; // RBW messages, see comments in case 1 } finally { IOUtils.closeStream(out); cluster.triggerHeartbeats(); - numQueued += numDN; // blockReceived } - assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). - getPendingDataNodeMessageCount()); + assertEquals("The queue should only have the latest report for each DN", + 3, nn2.getNamesystem().getPendingDataNodeMessageCount()); // case 3: similar to case 2, except no hflush is called. try { @@ -483,17 +474,12 @@ public void testQueueingWithAppend() throws Exception { // BPServiceActor#addPendingReplicationBlockInfo // IOUtils.closeStream(out); - numQueued += numDN; // blockReceived } cluster.triggerBlockReports(); - numQueued += numDN; - LOG.info("Expect " + numQueued + " and got: " + cluster.getNameNode(1).getNamesystem(). - getPendingDataNodeMessageCount()); - - assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). - getPendingDataNodeMessageCount()); + assertEquals("The queue should only have the latest report for each DN", + 3, nn2.getNamesystem().getPendingDataNodeMessageCount()); cluster.transitionToStandby(0); cluster.transitionToActive(1);