HDFS-7060. Avoid taking locks when sending heartbeats from the DataNode. Contributed by Jiandan Yang.

This commit is contained in:
Weiwei Yang 2017-11-08 10:22:13 +08:00
parent 51e882d5c9
commit bb8a6eea52
2 changed files with 37 additions and 54 deletions

View File

@ -153,22 +153,22 @@ public DatanodeStorage getStorage(final String storageUuid) {
public StorageReport[] getStorageReports(String bpid) public StorageReport[] getStorageReports(String bpid)
throws IOException { throws IOException {
List<StorageReport> reports; List<StorageReport> reports;
synchronized (statsLock) { // Volumes are the references from a copy-on-write snapshot, so the
List<FsVolumeImpl> curVolumes = volumes.getVolumes(); // access on the volume metrics doesn't require an additional lock.
reports = new ArrayList<>(curVolumes.size()); List<FsVolumeImpl> curVolumes = volumes.getVolumes();
for (FsVolumeImpl volume : curVolumes) { reports = new ArrayList<>(curVolumes.size());
try (FsVolumeReference ref = volume.obtainReference()) { for (FsVolumeImpl volume : curVolumes) {
StorageReport sr = new StorageReport(volume.toDatanodeStorage(), try (FsVolumeReference ref = volume.obtainReference()) {
false, StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
volume.getCapacity(), false,
volume.getDfsUsed(), volume.getCapacity(),
volume.getAvailable(), volume.getDfsUsed(),
volume.getBlockPoolUsed(bpid), volume.getAvailable(),
volume.getNonDfsUsed()); volume.getBlockPoolUsed(bpid),
reports.add(sr); volume.getNonDfsUsed());
} catch (ClosedChannelException e) { reports.add(sr);
continue; } catch (ClosedChannelException e) {
} continue;
} }
} }
@ -247,9 +247,6 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
private final int smallBufferSize; private final int smallBufferSize;
// Used for synchronizing access to usage stats
private final Object statsLock = new Object();
final LocalFileSystem localFS; final LocalFileSystem localFS;
private boolean blockPinningEnabled; private boolean blockPinningEnabled;
@ -583,9 +580,7 @@ public void removeVolumes(
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDfsUsed() throws IOException { public long getDfsUsed() throws IOException {
synchronized(statsLock) { return volumes.getDfsUsed();
return volumes.getDfsUsed();
}
} }
/** /**
@ -593,9 +588,7 @@ public long getDfsUsed() throws IOException {
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException { public long getBlockPoolUsed(String bpid) throws IOException {
synchronized(statsLock) { return volumes.getBlockPoolUsed(bpid);
return volumes.getBlockPoolUsed(bpid);
}
} }
/** /**
@ -611,9 +604,7 @@ public boolean hasEnoughResource() {
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCapacity() throws IOException { public long getCapacity() throws IOException {
synchronized(statsLock) { return volumes.getCapacity();
return volumes.getCapacity();
}
} }
/** /**
@ -621,9 +612,7 @@ public long getCapacity() throws IOException {
*/ */
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getRemaining() throws IOException { public long getRemaining() throws IOException {
synchronized(statsLock) { return volumes.getRemaining();
return volumes.getRemaining();
}
} }
/** /**

View File

@ -51,7 +51,6 @@
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -342,43 +341,38 @@ void onMetaFileDeletion(String bpid, long value) {
private void decDfsUsedAndNumBlocks(String bpid, long value, private void decDfsUsedAndNumBlocks(String bpid, long value,
boolean blockFileDeleted) { boolean blockFileDeleted) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { // BlockPoolSlice map is thread safe, and update the space used or
BlockPoolSlice bp = bpSlices.get(bpid); // number of blocks are atomic operations, so it doesn't require to
if (bp != null) { // hold the dataset lock.
bp.decDfsUsed(value); BlockPoolSlice bp = bpSlices.get(bpid);
if (blockFileDeleted) { if (bp != null) {
bp.decrNumBlocks(); bp.decDfsUsed(value);
} if (blockFileDeleted) {
bp.decrNumBlocks();
} }
} }
} }
void incDfsUsedAndNumBlocks(String bpid, long value) { void incDfsUsedAndNumBlocks(String bpid, long value) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid);
BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) {
if (bp != null) { bp.incDfsUsed(value);
bp.incDfsUsed(value); bp.incrNumBlocks();
bp.incrNumBlocks();
}
} }
} }
void incDfsUsed(String bpid, long value) { void incDfsUsed(String bpid, long value) {
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid);
BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) {
if (bp != null) { bp.incDfsUsed(value);
bp.incDfsUsed(value);
}
} }
} }
@VisibleForTesting @VisibleForTesting
public long getDfsUsed() throws IOException { public long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { for(BlockPoolSlice s : bpSlices.values()) {
for(BlockPoolSlice s : bpSlices.values()) { dfsUsed += s.getDfsUsed();
dfsUsed += s.getDfsUsed();
}
} }
return dfsUsed; return dfsUsed;
} }