HDFS-17342. Fix DataNode may invalidates normal block causing missing block (#6464). Contributed by Haiyang Hu.
Reviewed-by: ZanderXu <zanderxu@apache.org> Reviewed-by: Chengwei Wang <1139557635@qq.com> Signed-off-by: Shuyan Zhang <zhangshuyan@apache.org>
This commit is contained in:
parent
9a7eeadaac
commit
5ad7737132
@ -167,4 +167,9 @@ public void delayDeleteReplica() {}
|
||||
* Just delay run diff record a while.
|
||||
*/
|
||||
public void delayDiffRecord() {}
|
||||
|
||||
/**
|
||||
* Just delay getMetaDataInputStream a while.
|
||||
*/
|
||||
public void delayGetMetaDataInputStream() {}
|
||||
}
|
||||
|
@ -63,6 +63,7 @@
|
||||
import org.apache.hadoop.hdfs.server.common.AutoCloseDataSetLock;
|
||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.DataNodeLockManager.LockLevel;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataSetLockManager;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
@ -247,6 +248,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
if (info == null || !info.metadataExists()) {
|
||||
return null;
|
||||
}
|
||||
DataNodeFaultInjector.get().delayGetMetaDataInputStream();
|
||||
return info.getMetadataInputStream(0);
|
||||
}
|
||||
|
||||
@ -2403,8 +2405,9 @@ public void invalidate(String bpid, ReplicaInfo block) {
|
||||
*
|
||||
* @param bpid the block pool ID.
|
||||
* @param block The block to be invalidated.
|
||||
* @param checkFiles Whether to check data and meta files.
|
||||
*/
|
||||
public void invalidateMissingBlock(String bpid, Block block) {
|
||||
public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles) {
|
||||
|
||||
// The replica seems is on its volume map but not on disk.
|
||||
// We can't confirm here is block file lost or disk failed.
|
||||
@ -2416,10 +2419,20 @@ public void invalidateMissingBlock(String bpid, Block block) {
|
||||
// So remove if from volume map notify namenode is ok.
|
||||
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
|
||||
bpid)) {
|
||||
ReplicaInfo replica = volumeMap.remove(bpid, block);
|
||||
// Check if this block is on the volume map.
|
||||
ReplicaInfo replica = volumeMap.get(bpid, block);
|
||||
// Double-check block or meta file existence when checkFiles as true.
|
||||
if (replica != null && (!checkFiles ||
|
||||
(!replica.blockDataExists() || !replica.metadataExists()))) {
|
||||
volumeMap.remove(bpid, block);
|
||||
invalidate(bpid, replica);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void invalidateMissingBlock(String bpid, Block block) {
|
||||
invalidateMissingBlock(bpid, block, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove Replica from ReplicaMap.
|
||||
|
@ -1962,7 +1962,7 @@ public void delayDeleteReplica() {
|
||||
* 4. block would be recovered when disk back to normal.
|
||||
*/
|
||||
@Test
|
||||
public void tesInvalidateMissingBlock() throws Exception {
|
||||
public void testInvalidateMissingBlock() throws Exception {
|
||||
long blockSize = 1024;
|
||||
int heartbeatInterval = 1;
|
||||
HdfsConfiguration c = new HdfsConfiguration();
|
||||
@ -1988,7 +1988,7 @@ public void tesInvalidateMissingBlock() throws Exception {
|
||||
File metaFile = new File(metaPath);
|
||||
|
||||
// Mock local block file not found when disk with some exception.
|
||||
fsdataset.invalidateMissingBlock(bpid, replicaInfo);
|
||||
fsdataset.invalidateMissingBlock(bpid, replicaInfo, false);
|
||||
|
||||
// Assert local block file wouldn't be deleted from disk.
|
||||
assertTrue(blockFile.exists());
|
||||
@ -2011,4 +2011,95 @@ public void tesInvalidateMissingBlock() throws Exception {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckFilesWhenInvalidateMissingBlock() throws Exception {
|
||||
long blockSize = 1024;
|
||||
int heartbeatInterval = 1;
|
||||
HdfsConfiguration c = new HdfsConfiguration();
|
||||
c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heartbeatInterval);
|
||||
c.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).
|
||||
numDataNodes(1).build();
|
||||
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer.
|
||||
captureLogs(DataNode.LOG);
|
||||
BlockReaderTestUtil util = new BlockReaderTestUtil(cluster, new
|
||||
HdfsConfiguration(conf));
|
||||
Path path = new Path("/testFile");
|
||||
util.writeFile(path, 1);
|
||||
String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
FsDatasetImpl dnFSDataset = (FsDatasetImpl) dn.getFSDataset();
|
||||
List<ReplicaInfo> replicaInfos = dnFSDataset.getFinalizedBlocks(bpid);
|
||||
assertEquals(1, replicaInfos.size());
|
||||
DFSTestUtil.readFile(cluster.getFileSystem(), path);
|
||||
LocatedBlock blk = util.getFileBlocks(path, 512).get(0);
|
||||
ExtendedBlock block = blk.getBlock();
|
||||
|
||||
// Append a new block with an incremented generation stamp.
|
||||
long newGS = block.getGenerationStamp() + 1;
|
||||
dnFSDataset.append(block, newGS, 1024);
|
||||
block.setGenerationStamp(newGS);
|
||||
ReplicaInfo tmpReplicaInfo = dnFSDataset.getReplicaInfo(blk.getBlock());
|
||||
|
||||
DataNodeFaultInjector injector = new DataNodeFaultInjector() {
|
||||
@Override
|
||||
public void delayGetMetaDataInputStream() {
|
||||
try {
|
||||
Thread.sleep(8000);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore exception.
|
||||
}
|
||||
}
|
||||
};
|
||||
// Delay to getMetaDataInputStream.
|
||||
DataNodeFaultInjector.set(injector);
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
Future<?> blockReaderFuture = executorService.submit(() -> {
|
||||
try {
|
||||
// Submit tasks for reading block.
|
||||
BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
|
||||
cluster.getFileSystem(), blk, 0, 512);
|
||||
blockReader.close();
|
||||
} catch (IOException e) {
|
||||
// Ignore exception.
|
||||
}
|
||||
});
|
||||
|
||||
Future<?> finalizeBlockFuture = executorService.submit(() -> {
|
||||
try {
|
||||
// Submit tasks for finalizing block.
|
||||
Thread.sleep(1000);
|
||||
dnFSDataset.finalizeBlock(block, false);
|
||||
} catch (Exception e) {
|
||||
// Ignore exception
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for both tasks to complete.
|
||||
blockReaderFuture.get();
|
||||
finalizeBlockFuture.get();
|
||||
} finally {
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
// Validate the replica is exits.
|
||||
assertNotNull(dnFSDataset.getReplicaInfo(blk.getBlock()));
|
||||
|
||||
// Check DN log for FileNotFoundException.
|
||||
String expectedMsg = String.format("opReadBlock %s received exception " +
|
||||
"java.io.FileNotFoundException: %s (No such file or directory)",
|
||||
blk.getBlock(), tmpReplicaInfo.getMetadataURI().getPath());
|
||||
assertTrue("Expected log message not found in DN log.",
|
||||
logCapturer.getOutput().contains(expectedMsg));
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
DataNodeFaultInjector.set(oldDnInjector);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user