HDFS-16316.Improve DirectoryScanner: add regular file check related block. (#3861)
This commit is contained in:
parent
2f448acf39
commit
589695c6a9
@ -38,6 +38,7 @@
|
|||||||
import java.nio.file.AccessDeniedException;
|
import java.nio.file.AccessDeniedException;
|
||||||
import java.nio.file.FileSystems;
|
import java.nio.file.FileSystems;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.LinkOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -533,6 +534,26 @@ private static Path checkDest(String srcName, FileSystem dstFS, Path dst,
|
|||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static boolean isRegularFile(File file) {
|
||||||
|
return isRegularFile(file, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the file is regular.
|
||||||
|
* @param file The file being checked.
|
||||||
|
* @param allowLinks Whether to allow matching links.
|
||||||
|
* @return Returns the result of checking whether the file is a regular file.
|
||||||
|
*/
|
||||||
|
public static boolean isRegularFile(File file, boolean allowLinks) {
|
||||||
|
if (file != null) {
|
||||||
|
if (allowLinks) {
|
||||||
|
return Files.isRegularFile(file.toPath());
|
||||||
|
}
|
||||||
|
return Files.isRegularFile(file.toPath(), LinkOption.NOFOLLOW_LINKS);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a os-native filename to a path that works for the shell.
|
* Convert a os-native filename to a path that works for the shell.
|
||||||
* @param filename The filename to convert
|
* @param filename The filename to convert
|
||||||
|
@ -1466,6 +1466,23 @@ public void testReadSymlink() throws IOException {
|
|||||||
Assert.assertEquals(file.getAbsolutePath(), result);
|
Assert.assertEquals(file.getAbsolutePath(), result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegularFile() throws IOException {
|
||||||
|
byte[] data = "testRegularData".getBytes();
|
||||||
|
File tmpFile = new File(del, "reg1");
|
||||||
|
|
||||||
|
// write some data to the file
|
||||||
|
FileOutputStream os = new FileOutputStream(tmpFile);
|
||||||
|
os.write(data);
|
||||||
|
os.close();
|
||||||
|
assertTrue(FileUtil.isRegularFile(tmpFile));
|
||||||
|
|
||||||
|
// create a symlink to file
|
||||||
|
File link = new File(del, "reg2");
|
||||||
|
FileUtil.symLink(tmpFile.toString(), link.toString());
|
||||||
|
assertFalse(FileUtil.isRegularFile(link, false));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test validates the correctness of {@link FileUtil#readLink(File)} when
|
* This test validates the correctness of {@link FileUtil#readLink(File)} when
|
||||||
* it gets a file in input.
|
* it gets a file in input.
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
@ -540,21 +541,30 @@ private void scan() {
|
|||||||
m++;
|
m++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Block file and/or metadata file exists on the disk
|
|
||||||
// Block exists in memory
|
// Block and meta must be regular file
|
||||||
if (info.getBlockFile() == null) {
|
boolean isRegular = FileUtil.isRegularFile(info.getBlockFile(), false) &&
|
||||||
// Block metadata file exits and block file is missing
|
FileUtil.isRegularFile(info.getMetaFile(), false);
|
||||||
addDifference(diffRecord, statsRecord, info);
|
if (!isRegular) {
|
||||||
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|
|
||||||
|| info.getBlockLength() != memBlock.getNumBytes()) {
|
|
||||||
// Block metadata file is missing or has wrong generation stamp,
|
|
||||||
// or block file length is different than expected
|
|
||||||
statsRecord.mismatchBlocks++;
|
statsRecord.mismatchBlocks++;
|
||||||
addDifference(diffRecord, statsRecord, info);
|
addDifference(diffRecord, statsRecord, info);
|
||||||
} else if (memBlock.compareWith(info) != 0) {
|
} else {
|
||||||
// volumeMap record and on-disk files do not match.
|
// Block file and/or metadata file exists on the disk
|
||||||
statsRecord.duplicateBlocks++;
|
// Block exists in memory
|
||||||
addDifference(diffRecord, statsRecord, info);
|
if (info.getBlockFile() == null) {
|
||||||
|
// Block metadata file exits and block file is missing
|
||||||
|
addDifference(diffRecord, statsRecord, info);
|
||||||
|
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|
||||||
|
|| info.getBlockLength() != memBlock.getNumBytes()) {
|
||||||
|
// Block metadata file is missing or has wrong generation stamp,
|
||||||
|
// or block file length is different than expected
|
||||||
|
statsRecord.mismatchBlocks++;
|
||||||
|
addDifference(diffRecord, statsRecord, info);
|
||||||
|
} else if (memBlock.compareWith(info) != 0) {
|
||||||
|
// volumeMap record and on-disk files do not match.
|
||||||
|
statsRecord.duplicateBlocks++;
|
||||||
|
addDifference(diffRecord, statsRecord, info);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
d++;
|
d++;
|
||||||
|
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import javax.management.StandardMBean;
|
import javax.management.StandardMBean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.HardLink;
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
@ -2645,6 +2646,9 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
|
|||||||
Block.getGenerationStamp(diskMetaFile.getName()) :
|
Block.getGenerationStamp(diskMetaFile.getName()) :
|
||||||
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
|
HdfsConstants.GRANDFATHER_GENERATION_STAMP;
|
||||||
|
|
||||||
|
final boolean isRegular = FileUtil.isRegularFile(diskMetaFile, false) &&
|
||||||
|
FileUtil.isRegularFile(diskFile, false);
|
||||||
|
|
||||||
if (vol.getStorageType() == StorageType.PROVIDED) {
|
if (vol.getStorageType() == StorageType.PROVIDED) {
|
||||||
if (memBlockInfo == null) {
|
if (memBlockInfo == null) {
|
||||||
// replica exists on provided store but not in memory
|
// replica exists on provided store but not in memory
|
||||||
@ -2812,6 +2816,9 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
|
|||||||
+ memBlockInfo.getNumBytes() + " to "
|
+ memBlockInfo.getNumBytes() + " to "
|
||||||
+ memBlockInfo.getBlockDataLength());
|
+ memBlockInfo.getBlockDataLength());
|
||||||
memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
|
memBlockInfo.setNumBytes(memBlockInfo.getBlockDataLength());
|
||||||
|
} else if (!isRegular) {
|
||||||
|
corruptBlock = new Block(memBlockInfo);
|
||||||
|
LOG.warn("Block:{} is not a regular file.", corruptBlock.getBlockId());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (dataNodeMetrics != null) {
|
if (dataNodeMetrics != null) {
|
||||||
|
@ -49,6 +49,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.DF;
|
import org.apache.hadoop.fs.DF;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
@ -58,6 +59,7 @@
|
|||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
@ -71,6 +73,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
@ -507,6 +510,53 @@ public void testDeleteBlockOnTransientStorage() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 600000)
|
||||||
|
public void testRegularBlock() throws Exception {
|
||||||
|
Configuration conf = getConfiguration();
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||||
|
client = cluster.getFileSystem().getClient();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
|
||||||
|
// log trace
|
||||||
|
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
|
||||||
|
captureLogs(NameNode.stateChangeLog);
|
||||||
|
// Add files with 5 blocks
|
||||||
|
createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 5, false);
|
||||||
|
|
||||||
|
List<ReplicaInfo> infos = new ArrayList<>(FsDatasetTestUtil.getReplicas(fds, bpid));
|
||||||
|
ReplicaInfo lastReplica = infos.get(infos.size() - 1);
|
||||||
|
ReplicaInfo penultimateReplica = infos.get(infos.size() - 2);
|
||||||
|
|
||||||
|
String blockParent = new File(lastReplica.getBlockURI().getPath()).getParent();
|
||||||
|
File lastBlockFile = new File(blockParent, getBlockFile(lastReplica.getBlockId()));
|
||||||
|
File penultimateBlockFile = new File(blockParent,
|
||||||
|
getBlockFile(penultimateReplica.getBlockId()));
|
||||||
|
FileUtil.symLink(lastBlockFile.toString(), penultimateBlockFile.toString());
|
||||||
|
ExtendedBlock block = new ExtendedBlock(bpid, penultimateReplica.getBlockId());
|
||||||
|
|
||||||
|
scanner = new DirectoryScanner(fds, conf);
|
||||||
|
scanner.setRetainDiffs(true);
|
||||||
|
scanner.reconcile();
|
||||||
|
DirectoryScanner.Stats stats = scanner.stats.get(bpid);
|
||||||
|
assertNotNull(stats);
|
||||||
|
assertEquals(1, stats.mismatchBlocks);
|
||||||
|
|
||||||
|
// check nn log
|
||||||
|
String msg = "*DIR* reportBadBlocks for block: " + bpid + ":" +
|
||||||
|
getBlockFile(block.getBlockId());
|
||||||
|
assertTrue(logCapturer.getOutput().contains(msg));
|
||||||
|
} finally {
|
||||||
|
if (scanner != null) {
|
||||||
|
scanner.shutdown();
|
||||||
|
scanner = null;
|
||||||
|
}
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 600000)
|
@Test(timeout = 600000)
|
||||||
public void testDirectoryScanner() throws Exception {
|
public void testDirectoryScanner() throws Exception {
|
||||||
// Run the test with and without parallel scanning
|
// Run the test with and without parallel scanning
|
||||||
|
Loading…
Reference in New Issue
Block a user