diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 7b116d9e56..e9cdb2cc92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -162,4 +162,9 @@ public class DataNodeFaultInjector { * Just delay delete replica a while. */ public void delayDeleteReplica() {} + + /** + * Just delay run diff record a while. + */ + public void delayDiffRecord() {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index bf88e6fe88..30a2d2e584 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -509,6 +509,7 @@ public class DirectoryScanner implements Runnable { // Pre-sort the reports outside of the lock blockPoolReport.sortBlocks(); + DataNodeFaultInjector.get().delayDiffRecord(); for (final String bpid : blockPoolReport.getBlockPoolIds()) { List blockpoolReport = blockPoolReport.getScanInfo(bpid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index d3ac60d4a3..27fcbb12fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2737,9 +2737,6 @@ class FsDatasetImpl implements FsDatasetSpi { Block.getGenerationStamp(diskMetaFile.getName()) : HdfsConstants.GRANDFATHER_GENERATION_STAMP; - final boolean isRegular = FileUtil.isRegularFile(diskMetaFile, false) && - FileUtil.isRegularFile(diskFile, false); - if (vol.getStorageType() == StorageType.PROVIDED) { if (memBlockInfo == null) { // replica exists on provided store but not in memory @@ -2907,9 +2904,17 @@ class FsDatasetImpl implements FsDatasetSpi { + memBlockInfo.getNumBytes() + " to " + memBlockInfo.getBlockDataLength()); memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength()); - } else if (!isRegular) { - corruptBlock = new Block(memBlockInfo); - LOG.warn("Block:{} is not a regular file.", corruptBlock.getBlockId()); + } else { + // Check whether the memory block file and meta file are both regular files. + File memBlockFile = new File(memBlockInfo.getBlockURI()); + File memMetaFile = new File(memBlockInfo.getMetadataURI()); + boolean isRegular = FileUtil.isRegularFile(memMetaFile, false) && + FileUtil.isRegularFile(memBlockFile, false); + if (!isRegular) { + corruptBlock = new Block(memBlockInfo); + LOG.warn("Block:{} has some regular files, block file is {} and meta file is {}.", + corruptBlock.getBlockId(), memBlockFile, memMetaFile); + } } } finally { if (dataNodeMetrics != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 244b60e138..96b3263963 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -40,7 +40,9 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -56,10 +58,12 @@ import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -563,6 +567,88 @@ public class TestDirectoryScanner { } } + @Test(timeout = 600000) + public void testDirectoryScannerDuringUpdateBlockMeta() throws Exception { + Configuration conf = getConfiguration(); + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + DistributedFileSystem fs = cluster.getFileSystem(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer. + captureLogs(NameNode.stateChangeLog); + + // Add files with 1 blocks. + Path path = new Path("/testFile"); + DFSTestUtil.createFile(fs, path, 50, (short) 1, 0); + DFSTestUtil.waitReplication(fs, path, (short) 1); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, path).get(0); + DatanodeInfo[] loc = lb.getLocations(); + assertEquals(1, loc.length); + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override + public void delayDiffRecord() { + try { + Thread.sleep(8000); + } catch (InterruptedException e) { + // Ignore exception. + } + } + }; + + DataNodeFaultInjector.set(dnFaultInjector); + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + Future directoryScannerFuture = executorService.submit(() -> { + try { + // Submit tasks run directory scanner. + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scanner.reconcile(); + } catch (IOException e) { + // Ignore exception. + } + }); + + Future appendBlockFuture = executorService.submit(() -> { + try { + // Submit tasks run append file. + DFSTestUtil.appendFile(fs, path, 50); + } catch (Exception e) { + // Ignore exception. + } + }); + + // Wait for both tasks to complete. + directoryScannerFuture.get(); + appendBlockFuture.get(); + } finally { + executorService.shutdown(); + } + + DirectoryScanner.Stats stats = scanner.stats.get(bpid); + assertNotNull(stats); + assertEquals(1, stats.mismatchBlocks); + + // Check nn log will not reportBadBlocks message. + String msg = "*DIR* reportBadBlocks for block: " + bpid + ":" + + getBlockFile(lb.getBlock().getBlockId()); + assertFalse(logCapturer.getOutput().contains(msg)); + } finally { + if (scanner != null) { + scanner.shutdown(); + scanner = null; + } + DataNodeFaultInjector.set(oldDnInjector); + cluster.shutdown(); + cluster = null; + } + } + @Test(timeout = 600000) public void testDirectoryScanner() throws Exception { // Run the test with and without parallel scanning