HDFS-15415. Reduce locking in Datanode DirectoryScanner. Contributed by Stephen O'Donnell

This commit is contained in:
S O'Donnell 2020-09-22 12:00:02 +01:00
parent c3cb86ba42
commit 5f321df0a0

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
@ -472,87 +471,85 @@ public class DirectoryScanner implements Runnable {
// Pre-sort the reports outside of the lock
blockPoolReport.sortBlocks();
// Hold FSDataset lock to prevent further changes to the block map
try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
Collection<ScanInfo> diffRecord = new ArrayList<>();
Stats statsRecord = new Stats(bpid);
stats.put(bpid, statsRecord);
Collection<ScanInfo> diffRecord = new ArrayList<>();
statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);
statsRecord.totalBlocks = blockpoolReport.size();
final List<ReplicaInfo> bl;
bl = dataset.getSortedFinalizedBlocks(bpid);
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.size()) {
ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport.get(d);
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getVolume().getStorageType() != StorageType.PROVIDED
&& info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
int d = 0; // index for blockpoolReport
int m = 0; // index for memReprot
while (m < bl.size() && d < blockpoolReport.size()) {
ReplicaInfo memBlock = bl.get(m);
ScanInfo info = blockpoolReport.get(d);
if (info.getBlockId() < memBlock.getBlockId()) {
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
// Block is missing in memory
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
continue;
}
if (info.getBlockId() > memBlock.getBlockId()) {
// Block is missing on the disk
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
info.getVolume());
m++;
continue;
}
// Block file and/or metadata file exists on the disk
// Block exists in memory
if (info.getVolume().getStorageType() != StorageType.PROVIDED
&& info.getBlockFile() == null) {
// Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected
statsRecord.mismatchBlocks++;
addDifference(diffRecord, statsRecord, info);
} else if (memBlock.compareWith(info) != 0) {
// volumeMap record and on-disk files do not match.
statsRecord.duplicateBlocks++;
addDifference(diffRecord, statsRecord, info);
}
d++;
if (d < blockpoolReport.size()) {
// There may be multiple on-disk records for the same block, do not
// increment the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport.get(d);
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
if (d < blockpoolReport.size()) {
// There may be multiple on-disk records for the same block, do not
// increment the memory record pointer if so.
ScanInfo nextInfo = blockpoolReport.get(d);
if (nextInfo.getBlockId() != info.getBlockId()) {
++m;
}
} else {
++m;
}
while (m < bl.size()) {
ReplicaInfo current = bl.get(m++);
addDifference(diffRecord, statsRecord, current.getBlockId(),
current.getVolume());
}
while (d < blockpoolReport.size()) {
if (!dataset.isDeletingBlock(bpid,
blockpoolReport.get(d).getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
}
d++;
}
synchronized (diffs) {
diffs.addAll(bpid, diffRecord);
}
LOG.info("Scan Results: {}", statsRecord);
}
while (m < bl.size()) {
ReplicaInfo current = bl.get(m++);
addDifference(diffRecord, statsRecord, current.getBlockId(),
current.getVolume());
}
while (d < blockpoolReport.size()) {
if (!dataset.isDeletingBlock(bpid,
blockpoolReport.get(d).getBlockId())) {
statsRecord.missingMemoryBlocks++;
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
}
d++;
}
synchronized (diffs) {
diffs.addAll(bpid, diffRecord);
}
LOG.info("Scan Results: {}", statsRecord);
}
}