HDFS-16985. Fix data missing issue when delete local block file. (#5564). Contributed by Chengwei Wang.
Reviewed-by: Shuyan Zhang <zqingchai@gmail.com> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
e0938b4c2a
commit
251439d769
@ -36,7 +36,6 @@ import org.apache.hadoop.fs.ChecksumException;
|
|||||||
import org.apache.hadoop.fs.FsTracer;
|
import org.apache.hadoop.fs.FsTracer;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||||
@ -351,11 +350,8 @@ class BlockSender implements java.io.Closeable {
|
|||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
if ((e.getMessage() != null) && !(e.getMessage()
|
if ((e.getMessage() != null) && !(e.getMessage()
|
||||||
.contains("Too many open files"))) {
|
.contains("Too many open files"))) {
|
||||||
// The replica is on its volume map but not on disk
|
datanode.data.invalidateMissingBlock(block.getBlockPoolId(),
|
||||||
datanode
|
block.getLocalBlock());
|
||||||
.notifyNamenodeDeletedBlock(block, replica.getStorageUuid());
|
|
||||||
datanode.data.invalidate(block.getBlockPoolId(),
|
|
||||||
new Block[] {block.getLocalBlock()});
|
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -476,7 +476,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||||||
void invalidate(String bpid, Block invalidBlks[]) throws IOException;
|
void invalidate(String bpid, Block invalidBlks[]) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Caches the specified blocks
|
* Invalidate a block which is not found on disk.
|
||||||
|
* @param bpid the block pool ID.
|
||||||
|
* @param block The block to be invalidated.
|
||||||
|
*/
|
||||||
|
void invalidateMissingBlock(String bpid, Block block) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caches the specified block
|
||||||
* @param bpid Block pool id
|
* @param bpid Block pool id
|
||||||
* @param blockIds - block ids to cache
|
* @param blockIds - block ids to cache
|
||||||
*/
|
*/
|
||||||
|
@ -2395,6 +2395,30 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||||||
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
||||||
block.getStorageUuid());
|
block.getStorageUuid());
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Invalidate a block which is not found on disk. We should remove it from
|
||||||
|
* memory and notify namenode, but unnecessary to delete the actual on-disk
|
||||||
|
* block file again.
|
||||||
|
*
|
||||||
|
* @param bpid the block pool ID.
|
||||||
|
* @param block The block to be invalidated.
|
||||||
|
*/
|
||||||
|
public void invalidateMissingBlock(String bpid, Block block) {
|
||||||
|
|
||||||
|
// The replica seems is on its volume map but not on disk.
|
||||||
|
// We can't confirm here is block file lost or disk failed.
|
||||||
|
// If block lost:
|
||||||
|
// deleted local block file is completely unnecessary
|
||||||
|
// If disk failed:
|
||||||
|
// deleted local block file here may lead to missing-block
|
||||||
|
// when it with only 1 replication left now.
|
||||||
|
// So remove if from volume map notify namenode is ok.
|
||||||
|
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
|
||||||
|
bpid)) {
|
||||||
|
ReplicaInfo replica = volumeMap.remove(bpid, block);
|
||||||
|
invalidate(bpid, replica);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove Replica from ReplicaMap.
|
* Remove Replica from ReplicaMap.
|
||||||
|
@ -1018,6 +1018,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void invalidateMissingBlock(String bpid, Block block)
|
||||||
|
throws IOException {
|
||||||
|
this.invalidate(bpid, new Block[]{block});
|
||||||
|
}
|
||||||
|
|
||||||
@Override // FSDatasetSpi
|
@Override // FSDatasetSpi
|
||||||
public void cache(String bpid, long[] cacheBlks) {
|
public void cache(String bpid, long[] cacheBlks) {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
|
@ -230,6 +230,10 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||||||
public void invalidate(String bpid, Block[] invalidBlks) throws IOException {
|
public void invalidate(String bpid, Block[] invalidBlks) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void invalidateMissingBlock(String bpid, Block block) {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cache(String bpid, long[] blockIds) {
|
public void cache(String bpid, long[] blockIds) {
|
||||||
}
|
}
|
||||||
|
@ -1919,4 +1919,63 @@ public class TestFsDatasetImpl {
|
|||||||
DataNodeFaultInjector.set(oldInjector);
|
DataNodeFaultInjector.set(oldInjector);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the block file which is not found when disk with some exception.
|
||||||
|
* We expect:
|
||||||
|
* 1. block file wouldn't be deleted from disk.
|
||||||
|
* 2. block info would be removed from dn memory.
|
||||||
|
* 3. block would be reported to nn as missing block.
|
||||||
|
* 4. block would be recovered when disk back to normal.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void tesInvalidateMissingBlock() throws Exception {
|
||||||
|
long blockSize = 1024;
|
||||||
|
int heatbeatInterval = 1;
|
||||||
|
HdfsConfiguration c = new HdfsConfiguration();
|
||||||
|
c.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, heatbeatInterval);
|
||||||
|
c.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(c).
|
||||||
|
numDataNodes(1).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
DFSTestUtil.createFile(cluster.getFileSystem(), new Path("/a"),
|
||||||
|
blockSize, (short)1, 0);
|
||||||
|
|
||||||
|
String bpid = cluster.getNameNode().getNamesystem().getBlockPoolId();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
FsDatasetImpl fsdataset = (FsDatasetImpl) dn.getFSDataset();
|
||||||
|
List<ReplicaInfo> replicaInfos = fsdataset.getFinalizedBlocks(bpid);
|
||||||
|
assertEquals(1, replicaInfos.size());
|
||||||
|
|
||||||
|
ReplicaInfo replicaInfo = replicaInfos.get(0);
|
||||||
|
String blockPath = replicaInfo.getBlockURI().getPath();
|
||||||
|
String metaPath = replicaInfo.getMetadataURI().getPath();
|
||||||
|
File blockFile = new File(blockPath);
|
||||||
|
File metaFile = new File(metaPath);
|
||||||
|
|
||||||
|
// Mock local block file not found when disk with some exception.
|
||||||
|
fsdataset.invalidateMissingBlock(bpid, replicaInfo);
|
||||||
|
|
||||||
|
// Assert local block file wouldn't be deleted from disk.
|
||||||
|
assertTrue(blockFile.exists());
|
||||||
|
// Assert block info would be removed from ReplicaMap.
|
||||||
|
assertEquals("null",
|
||||||
|
fsdataset.getReplicaString(bpid, replicaInfo.getBlockId()));
|
||||||
|
BlockManager blockManager = cluster.getNameNode().
|
||||||
|
getNamesystem().getBlockManager();
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
blockManager.getLowRedundancyBlocksCount() == 1, 100, 5000);
|
||||||
|
|
||||||
|
// Mock local block file found when disk back to normal.
|
||||||
|
FsVolumeSpi.ScanInfo info = new FsVolumeSpi.ScanInfo(
|
||||||
|
replicaInfo.getBlockId(), blockFile.getParentFile().getAbsoluteFile(),
|
||||||
|
blockFile.getName(), metaFile.getName(), replicaInfo.getVolume());
|
||||||
|
fsdataset.checkAndUpdate(bpid, info);
|
||||||
|
GenericTestUtils.waitFor(() ->
|
||||||
|
blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user