HDFS-15415. Reduce locking in Datanode DirectoryScanner. Contributed by Stephen O'Donnell
This commit is contained in:
parent
425f48799c
commit
20a0e6278d
@ -46,7 +46,6 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
|
||||
import org.apache.hadoop.util.AutoCloseableLock;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.StopWatch;
|
||||
import org.slf4j.Logger;
|
||||
@ -472,87 +471,85 @@ private void scan() {
|
||||
// Pre-sort the reports outside of the lock
|
||||
blockPoolReport.sortBlocks();
|
||||
|
||||
// Hold FSDataset lock to prevent further changes to the block map
|
||||
try (AutoCloseableLock lock = dataset.acquireDatasetReadLock()) {
|
||||
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
|
||||
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
|
||||
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
|
||||
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);
|
||||
|
||||
Stats statsRecord = new Stats(bpid);
|
||||
stats.put(bpid, statsRecord);
|
||||
Collection<ScanInfo> diffRecord = new ArrayList<>();
|
||||
Stats statsRecord = new Stats(bpid);
|
||||
stats.put(bpid, statsRecord);
|
||||
Collection<ScanInfo> diffRecord = new ArrayList<>();
|
||||
|
||||
statsRecord.totalBlocks = blockpoolReport.size();
|
||||
final List<ReplicaInfo> bl = dataset.getSortedFinalizedBlocks(bpid);
|
||||
statsRecord.totalBlocks = blockpoolReport.size();
|
||||
final List<ReplicaInfo> bl;
|
||||
bl = dataset.getSortedFinalizedBlocks(bpid);
|
||||
|
||||
int d = 0; // index for blockpoolReport
|
||||
int m = 0; // index for memReprot
|
||||
while (m < bl.size() && d < blockpoolReport.size()) {
|
||||
ReplicaInfo memBlock = bl.get(m);
|
||||
ScanInfo info = blockpoolReport.get(d);
|
||||
if (info.getBlockId() < memBlock.getBlockId()) {
|
||||
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
|
||||
// Block is missing in memory
|
||||
statsRecord.missingMemoryBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
}
|
||||
d++;
|
||||
continue;
|
||||
}
|
||||
if (info.getBlockId() > memBlock.getBlockId()) {
|
||||
// Block is missing on the disk
|
||||
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
|
||||
info.getVolume());
|
||||
m++;
|
||||
continue;
|
||||
}
|
||||
// Block file and/or metadata file exists on the disk
|
||||
// Block exists in memory
|
||||
if (info.getVolume().getStorageType() != StorageType.PROVIDED
|
||||
&& info.getBlockFile() == null) {
|
||||
// Block metadata file exits and block file is missing
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|
||||
|| info.getBlockLength() != memBlock.getNumBytes()) {
|
||||
// Block metadata file is missing or has wrong generation stamp,
|
||||
// or block file length is different than expected
|
||||
statsRecord.mismatchBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
} else if (memBlock.compareWith(info) != 0) {
|
||||
// volumeMap record and on-disk files do not match.
|
||||
statsRecord.duplicateBlocks++;
|
||||
int d = 0; // index for blockpoolReport
|
||||
int m = 0; // index for memReprot
|
||||
while (m < bl.size() && d < blockpoolReport.size()) {
|
||||
ReplicaInfo memBlock = bl.get(m);
|
||||
ScanInfo info = blockpoolReport.get(d);
|
||||
if (info.getBlockId() < memBlock.getBlockId()) {
|
||||
if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
|
||||
// Block is missing in memory
|
||||
statsRecord.missingMemoryBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
}
|
||||
d++;
|
||||
continue;
|
||||
}
|
||||
if (info.getBlockId() > memBlock.getBlockId()) {
|
||||
// Block is missing on the disk
|
||||
addDifference(diffRecord, statsRecord, memBlock.getBlockId(),
|
||||
info.getVolume());
|
||||
m++;
|
||||
continue;
|
||||
}
|
||||
// Block file and/or metadata file exists on the disk
|
||||
// Block exists in memory
|
||||
if (info.getVolume().getStorageType() != StorageType.PROVIDED
|
||||
&& info.getBlockFile() == null) {
|
||||
// Block metadata file exits and block file is missing
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
} else if (info.getGenStamp() != memBlock.getGenerationStamp()
|
||||
|| info.getBlockLength() != memBlock.getNumBytes()) {
|
||||
// Block metadata file is missing or has wrong generation stamp,
|
||||
// or block file length is different than expected
|
||||
statsRecord.mismatchBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
} else if (memBlock.compareWith(info) != 0) {
|
||||
// volumeMap record and on-disk files do not match.
|
||||
statsRecord.duplicateBlocks++;
|
||||
addDifference(diffRecord, statsRecord, info);
|
||||
}
|
||||
d++;
|
||||
|
||||
if (d < blockpoolReport.size()) {
|
||||
// There may be multiple on-disk records for the same block, do not
|
||||
// increment the memory record pointer if so.
|
||||
ScanInfo nextInfo = blockpoolReport.get(d);
|
||||
if (nextInfo.getBlockId() != info.getBlockId()) {
|
||||
++m;
|
||||
}
|
||||
} else {
|
||||
if (d < blockpoolReport.size()) {
|
||||
// There may be multiple on-disk records for the same block, do not
|
||||
// increment the memory record pointer if so.
|
||||
ScanInfo nextInfo = blockpoolReport.get(d);
|
||||
if (nextInfo.getBlockId() != info.getBlockId()) {
|
||||
++m;
|
||||
}
|
||||
} else {
|
||||
++m;
|
||||
}
|
||||
while (m < bl.size()) {
|
||||
ReplicaInfo current = bl.get(m++);
|
||||
addDifference(diffRecord, statsRecord, current.getBlockId(),
|
||||
current.getVolume());
|
||||
}
|
||||
while (d < blockpoolReport.size()) {
|
||||
if (!dataset.isDeletingBlock(bpid,
|
||||
blockpoolReport.get(d).getBlockId())) {
|
||||
statsRecord.missingMemoryBlocks++;
|
||||
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
|
||||
}
|
||||
d++;
|
||||
}
|
||||
synchronized (diffs) {
|
||||
diffs.addAll(bpid, diffRecord);
|
||||
}
|
||||
LOG.info("Scan Results: {}", statsRecord);
|
||||
}
|
||||
while (m < bl.size()) {
|
||||
ReplicaInfo current = bl.get(m++);
|
||||
addDifference(diffRecord, statsRecord, current.getBlockId(),
|
||||
current.getVolume());
|
||||
}
|
||||
while (d < blockpoolReport.size()) {
|
||||
if (!dataset.isDeletingBlock(bpid,
|
||||
blockpoolReport.get(d).getBlockId())) {
|
||||
statsRecord.missingMemoryBlocks++;
|
||||
addDifference(diffRecord, statsRecord, blockpoolReport.get(d));
|
||||
}
|
||||
d++;
|
||||
}
|
||||
synchronized (diffs) {
|
||||
diffs.addAll(bpid, diffRecord);
|
||||
}
|
||||
LOG.info("Scan Results: {}", statsRecord);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user