HDFS-17453. IncrementalBlockReport can have race condition with Edit Log Tailer (#6708)

This commit is contained in:
dannytbecker 2024-04-10 09:30:24 -07:00 committed by GitHub
parent dbe2d61258
commit 05964ad07a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 153 additions and 33 deletions

View File

@ -95,16 +95,30 @@ void removeAllMessagesForDatanode(DatanodeDescriptor dn) {
void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState) { ReplicaState reportedState) {
if (storageInfo == null || block == null) {
return;
}
block = new Block(block);
long genStamp = block.getGenerationStamp();
Queue<ReportedBlockInfo> queue = null;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) { if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
Block blkId = new Block(BlockIdManager.convertToStripedID(block Block blkId = new Block(BlockIdManager.convertToStripedID(block
.getBlockId())); .getBlockId()));
getBlockQueue(blkId).add( queue = getBlockQueue(blkId);
new ReportedBlockInfo(storageInfo, new Block(block), reportedState));
} else { } else {
block = new Block(block); queue = getBlockQueue(block);
getBlockQueue(block).add(
new ReportedBlockInfo(storageInfo, block, reportedState));
} }
// We only want the latest non-future reported block to be queued for each
// DataNode. Otherwise, there can be a race condition that causes an old
// reported block to be kept in the queue until the SNN switches to ANN and
// the old reported block will be processed and marked as corrupt by the ANN.
// See HDFS-17453
int size = queue.size();
if (queue.removeIf(rbi -> storageInfo.equals(rbi.storageInfo) &&
rbi.block.getGenerationStamp() <= genStamp)) {
count -= (size - queue.size());
}
queue.add(new ReportedBlockInfo(storageInfo, block, reportedState));
count++; count++;
} }

View File

@ -20,6 +20,8 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import java.util.Arrays;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -52,11 +54,21 @@ public class TestPendingDataNodeMessages {
@Test @Test
public void testQueues() { public void testQueues() {
DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); DatanodeDescriptor fakeDN1 = DFSTestUtil.getDatanodeDescriptor(
DatanodeStorage storage = new DatanodeStorage("STORAGE_ID"); "localhost", 8898, "/default-rack");
DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage); DatanodeDescriptor fakeDN2 = DFSTestUtil.getDatanodeDescriptor(
msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED); "localhost", 8899, "/default-rack");
msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED); DatanodeStorage storage1 = new DatanodeStorage("STORAGE_ID_1");
DatanodeStorage storage2 = new DatanodeStorage("STORAGE_ID_2");
DatanodeStorageInfo storageInfo1 = new DatanodeStorageInfo(fakeDN1, storage1);
DatanodeStorageInfo storageInfo2 = new DatanodeStorageInfo(fakeDN2, storage2);
msgs.enqueueReportedBlock(storageInfo1, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo2, block1Gs1, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo1, block1Gs2, ReplicaState.FINALIZED);
msgs.enqueueReportedBlock(storageInfo2, block1Gs2, ReplicaState.FINALIZED);
List<ReportedBlockInfo> rbis = Arrays.asList(
new ReportedBlockInfo(storageInfo1, block1Gs2, ReplicaState.FINALIZED),
new ReportedBlockInfo(storageInfo2, block1Gs2, ReplicaState.FINALIZED));
assertEquals(2, msgs.count()); assertEquals(2, msgs.count());
@ -66,9 +78,7 @@ public void testQueues() {
Queue<ReportedBlockInfo> q = Queue<ReportedBlockInfo> q =
msgs.takeBlockQueue(block1Gs2DifferentInstance); msgs.takeBlockQueue(block1Gs2DifferentInstance);
assertEquals( assertEquals(Joiner.on(",").join(rbis),
"ReportedBlockInfo [block=blk_1_1, dn=127.0.0.1:9866, reportedState=FINALIZED]," +
"ReportedBlockInfo [block=blk_1_2, dn=127.0.0.1:9866, reportedState=FINALIZED]",
Joiner.on(",").join(q)); Joiner.on(",").join(q));
assertEquals(0, msgs.count()); assertEquals(0, msgs.count());

View File

@ -17,14 +17,25 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -55,6 +66,9 @@ public class TestIncrementalBlockReports {
private static final long DUMMY_BLOCK_ID = 5678; private static final long DUMMY_BLOCK_ID = 5678;
private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024; private static final long DUMMY_BLOCK_LENGTH = 1024 * 1024;
private static final long DUMMY_BLOCK_GENSTAMP = 1000; private static final long DUMMY_BLOCK_GENSTAMP = 1000;
private static final String TEST_FILE_DATA = "hello world";
private static final String TEST_FILE = "/TestStandbyBlockManagement";
private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
private MiniDFSCluster cluster = null; private MiniDFSCluster cluster = null;
private Configuration conf; private Configuration conf;
@ -215,4 +229,100 @@ public void testReplaceReceivedBlock() throws InterruptedException, IOException
cluster = null; cluster = null;
} }
} }
@Test
public void testIBRRaceCondition() throws Exception {
cluster.shutdown();
conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
List<InvocationOnMock> ibrsToStandby = new ArrayList<>();
List<DatanodeProtocolClientSideTranslatorPB> spies = new ArrayList<>();
Phaser ibrPhaser = new Phaser(1);
for (DataNode dn : cluster.getDataNodes()) {
DatanodeProtocolClientSideTranslatorPB nnSpy =
InternalDataNodeTestUtils.spyOnBposToNN(dn, nn2);
doAnswer((inv) -> {
for (StorageReceivedDeletedBlocks srdb :
inv.getArgument(2, StorageReceivedDeletedBlocks[].class)) {
for (ReceivedDeletedBlockInfo block : srdb.getBlocks()) {
if (block.getStatus().equals(BlockStatus.RECEIVED_BLOCK)) {
ibrPhaser.arriveAndDeregister();
}
}
}
ibrsToStandby.add(inv);
return null;
}).when(nnSpy).blockReceivedAndDeleted(
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
spies.add(nnSpy);
}
LOG.info("==================================");
// Force the DNs to delay report to the SNN
ibrPhaser.bulkRegister(9);
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
// SNN has caught up to the latest edit log so we send the IBRs to SNN
int phase = ibrPhaser.arrive();
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
for (InvocationOnMock sendIBRs : ibrsToStandby) {
try {
sendIBRs.callRealMethod();
} catch (Throwable t) {
LOG.error("Exception thrown while calling sendIBRs: ", t);
}
}
assertEquals("There should be 3 pending messages from DNs", 3,
nn2.getNamesystem().getBlockManager().getPendingDataNodeMessageCount());
ibrsToStandby.clear();
// We need to trigger another edit log roll so that the pendingDNMessages
// are processed.
ibrPhaser.bulkRegister(6);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
DFSTestUtil.appendFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
phase = ibrPhaser.arrive();
ibrPhaser.awaitAdvanceInterruptibly(phase, 60, TimeUnit.SECONDS);
for (InvocationOnMock sendIBRs : ibrsToStandby) {
try {
sendIBRs.callRealMethod();
} catch (Throwable t) {
LOG.error("Exception thrown while calling sendIBRs: ", t);
}
}
ibrsToStandby.clear();
ibrPhaser.arriveAndDeregister();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_FILE_PATH);
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
LOG.info("==================================");
// Trigger an active switch to force SNN to mark blocks as corrupt if they
// have a bad genstamp in the pendingDNMessages queue.
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.waitActive(1);
assertEquals("There should not be any corrupt replicas", 0,
nn2.getNamesystem().getBlockManager()
.numCorruptReplicas(block.getLocalBlock()));
} finally {
cluster.shutdown();
}
}
} }

View File

@ -421,9 +421,6 @@ public void testBlockReportsWhileFileBeingWritten() throws Exception {
*/ */
@Test @Test
public void testQueueingWithAppend() throws Exception { public void testQueueingWithAppend() throws Exception {
int numQueued = 0;
int numDN = cluster.getDataNodes().size();
// case 1: create file and call hflush after write // case 1: create file and call hflush after write
FSDataOutputStream out = fs.create(TEST_FILE_PATH); FSDataOutputStream out = fs.create(TEST_FILE_PATH);
try { try {
@ -436,20 +433,16 @@ public void testQueueingWithAppend() throws Exception {
// Apply cluster.triggerBlockReports() to trigger the reporting sooner. // Apply cluster.triggerBlockReports() to trigger the reporting sooner.
// //
cluster.triggerBlockReports(); cluster.triggerBlockReports();
numQueued += numDN; // RBW messages
// The cluster.triggerBlockReports() call above does a full // The cluster.triggerBlockReports() call above does a full
// block report that incurs 3 extra RBW messages // block report that incurs 3 extra RBW messages
numQueued += numDN; // RBW messages
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
numQueued += numDN; // blockReceived messages
} }
cluster.triggerBlockReports(); cluster.triggerBlockReports();
numQueued += numDN; assertEquals("The queue should only have the latest report for each DN",
assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). 3, nn2.getNamesystem().getPendingDataNodeMessageCount());
getPendingDataNodeMessageCount());
// case 2: append to file and call hflush after write // case 2: append to file and call hflush after write
try { try {
@ -457,14 +450,12 @@ public void testQueueingWithAppend() throws Exception {
AppendTestUtil.write(out, 10, 10); AppendTestUtil.write(out, 10, 10);
out.hflush(); out.hflush();
cluster.triggerBlockReports(); cluster.triggerBlockReports();
numQueued += numDN * 2; // RBW messages, see comments in case 1
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
numQueued += numDN; // blockReceived
} }
assertEquals(numQueued, cluster.getNameNode(1).getNamesystem(). assertEquals("The queue should only have the latest report for each DN",
getPendingDataNodeMessageCount()); 3, nn2.getNamesystem().getPendingDataNodeMessageCount());
// case 3: similar to case 2, except no hflush is called. // case 3: similar to case 2, except no hflush is called.
try { try {
@ -483,17 +474,12 @@ public void testQueueingWithAppend() throws Exception {
// BPServiceActor#addPendingReplicationBlockInfo // BPServiceActor#addPendingReplicationBlockInfo
// //
IOUtils.closeStream(out); IOUtils.closeStream(out);
numQueued += numDN; // blockReceived
} }
cluster.triggerBlockReports(); cluster.triggerBlockReports();
numQueued += numDN;
LOG.info("Expect " + numQueued + " and got: " + cluster.getNameNode(1).getNamesystem(). assertEquals("The queue should only have the latest report for each DN",
getPendingDataNodeMessageCount()); 3, nn2.getNamesystem().getPendingDataNodeMessageCount());
assertEquals(numQueued, cluster.getNameNode(1).getNamesystem().
getPendingDataNodeMessageCount());
cluster.transitionToStandby(0); cluster.transitionToStandby(0);
cluster.transitionToActive(1); cluster.transitionToActive(1);