HDFS-17346. Fix DirectoryScanner check mark the normal blocks as corrupt (#6476). Contributed by Haiyang Hu.

Reviewed-by: ZanderXu <zanderxu@apache.org>
Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
huhaiyang 2024-01-25 12:49:18 +08:00 committed by GitHub
parent 6b80b1e60f
commit caba9bbab3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 103 additions and 6 deletions

View File

@ -162,4 +162,9 @@ public void markSlow(String dnAddr, int[] replies) {}
* Just delay delete replica a while. * Just delay delete replica a while.
*/ */
public void delayDeleteReplica() {} public void delayDeleteReplica() {}
/**
* Just delay run diff record a while.
*/
public void delayDiffRecord() {}
} }

View File

@ -509,6 +509,7 @@ private void scan() {
// Pre-sort the reports outside of the lock // Pre-sort the reports outside of the lock
blockPoolReport.sortBlocks(); blockPoolReport.sortBlocks();
DataNodeFaultInjector.get().delayDiffRecord();
for (final String bpid : blockPoolReport.getBlockPoolIds()) { for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid); List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);

View File

@ -2737,9 +2737,6 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
Block.getGenerationStamp(diskMetaFile.getName()) : Block.getGenerationStamp(diskMetaFile.getName()) :
HdfsConstants.GRANDFATHER_GENERATION_STAMP; HdfsConstants.GRANDFATHER_GENERATION_STAMP;
final boolean isRegular = FileUtil.isRegularFile(diskMetaFile, false) &&
FileUtil.isRegularFile(diskFile, false);
if (vol.getStorageType() == StorageType.PROVIDED) { if (vol.getStorageType() == StorageType.PROVIDED) {
if (memBlockInfo == null) { if (memBlockInfo == null) {
// replica exists on provided store but not in memory // replica exists on provided store but not in memory
@ -2907,9 +2904,17 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
+ memBlockInfo.getNumBytes() + " to " + memBlockInfo.getNumBytes() + " to "
+ memBlockInfo.getBlockDataLength()); + memBlockInfo.getBlockDataLength());
memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength()); memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
} else if (!isRegular) { } else {
corruptBlock = new Block(memBlockInfo); // Check whether the memory block file and meta file are both regular files.
LOG.warn("Block:{} is not a regular file.", corruptBlock.getBlockId()); File memBlockFile = new File(memBlockInfo.getBlockURI());
File memMetaFile = new File(memBlockInfo.getMetadataURI());
boolean isRegular = FileUtil.isRegularFile(memMetaFile, false) &&
FileUtil.isRegularFile(memBlockFile, false);
if (!isRegular) {
corruptBlock = new Block(memBlockInfo);
LOG.warn("Block:{} has some regular files, block file is {} and meta file is {}.",
corruptBlock.getBlockId(), memBlockFile, memMetaFile);
}
} }
} finally { } finally {
if (dataNodeMetrics != null) { if (dataNodeMetrics != null) {

View File

@ -40,7 +40,9 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -56,10 +58,12 @@
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -563,6 +567,88 @@ public void testRegularBlock() throws Exception {
} }
} }
@Test(timeout = 600000)
public void testDirectoryScannerDuringUpdateBlockMeta() throws Exception {
Configuration conf = getConfiguration();
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
cluster.waitActive();
bpid = cluster.getNamesystem().getBlockPoolId();
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
client = cluster.getFileSystem().getClient();
DistributedFileSystem fs = cluster.getFileSystem();
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
captureLogs(NameNode.stateChangeLog);
// Add files with 1 blocks.
Path path = new Path("/testFile");
DFSTestUtil.createFile(fs, path, 50, (short) 1, 0);
DFSTestUtil.waitReplication(fs, path, (short) 1);
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0);
DatanodeInfo[] loc = lb.getLocations();
assertEquals(1, loc.length);
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override
public void delayDiffRecord() {
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
// Ignore exception.
}
}
};
DataNodeFaultInjector.set(dnFaultInjector);
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
Future<?> directoryScannerFuture = executorService.submit(() -> {
try {
// Submit tasks run directory scanner.
scanner = new DirectoryScanner(fds, conf);
scanner.setRetainDiffs(true);
scanner.reconcile();
} catch (IOException e) {
// Ignore exception.
}
});
Future<?> appendBlockFuture = executorService.submit(() -> {
try {
// Submit tasks run append file.
DFSTestUtil.appendFile(fs, path, 50);
} catch (Exception e) {
// Ignore exception.
}
});
// Wait for both tasks to complete.
directoryScannerFuture.get();
appendBlockFuture.get();
} finally {
executorService.shutdown();
}
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
assertNotNull(stats);
assertEquals(1, stats.mismatchBlocks);
// Check nn log will not reportBadBlocks message.
String msg = "*DIR* reportBadBlocks for block: " + bpid + ":" +
getBlockFile(lb.getBlock().getBlockId());
assertFalse(logCapturer.getOutput().contains(msg));
} finally {
if (scanner != null) {
scanner.shutdown();
scanner = null;
}
DataNodeFaultInjector.set(oldDnInjector);
cluster.shutdown();
cluster = null;
}
}
@Test(timeout = 600000) @Test(timeout = 600000)
public void testDirectoryScanner() throws Exception { public void testDirectoryScanner() throws Exception {
// Run the test with and without parallel scanning // Run the test with and without parallel scanning