HDFS-16534. Split FsDatasetImpl from block pool locks to volume grain locks. (#4141) Contributed by limingxiang.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
He Xiaoqiao 2022-04-17 19:21:27 +08:00
parent f14f305051
commit cb975c3df6
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
5 changed files with 95 additions and 41 deletions

View File

@ -432,6 +432,9 @@ private synchronized void activateVolume(
ReplicaMap replicaMap,
Storage.StorageDirectory sd, StorageType storageType,
FsVolumeReference ref) throws IOException {
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID());
}
DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid());
if (dnStorage != null) {
final String errorMsg = String.format(
@ -629,6 +632,9 @@ public void removeVolumes(
synchronized (this) {
for (String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
for (String bp : volumeMap.getBlockPoolList()) {
lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid);
}
}
}
}
@ -906,8 +912,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@ -1372,8 +1378,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta,
@Override // FsDatasetSpi
public ReplicaHandler append(ExtendedBlock b,
long newGS, long expectedBlockLen) throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// If the block was successfully finalized because all packets
// were successfully processed at the Datanode but the ack for
// some of the packets were not received by the client. The client
@ -1425,7 +1431,8 @@ public ReplicaHandler append(ExtendedBlock b,
private ReplicaInPipeline append(String bpid,
ReplicaInfo replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// If the block is cached, start uncaching it.
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
throw new IOException("Only a Finalized replica can be appended to; "
@ -1554,8 +1561,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS,
LOG.info("Recover failed close " + b);
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check replica's state
ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen);
// bump the replica's GS
@ -1578,7 +1585,7 @@ public ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
@ -1626,20 +1633,20 @@ public ReplicaHandler createRbw(
}
ReplicaInPipeline newReplicaInfo;
try {
try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
newReplicaInfo = v.createRbw(b);
if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
throw new IOException("CreateRBW returned a replica of state "
+ newReplicaInfo.getReplicaInfo().getState()
+ " for block " + b.getBlockId());
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} catch (IOException e) {
IOUtils.cleanupWithLogger(null, ref);
throw e;
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
} finally {
if (dataNodeMetrics != null) {
long createRbwMs = Time.monotonicNow() - startTimeMs;
@ -1657,8 +1664,8 @@ public ReplicaHandler recoverRbw(
try {
while (true) {
try {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo =
getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
// check the replica's state
@ -1689,8 +1696,8 @@ public ReplicaHandler recoverRbw(
private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
// check generation stamp
long replicaGenerationStamp = rbw.getGenerationStamp();
if (replicaGenerationStamp < b.getGenerationStamp() ||
@ -1751,8 +1758,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw,
public ReplicaInPipeline convertTemporaryToRbw(
final ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
final long blockId = b.getBlockId();
final long expectedGs = b.getGenerationStamp();
final long visible = b.getNumBytes();
@ -1887,12 +1894,12 @@ public ReplicaHandler createTemporary(StorageType storageType,
false);
}
long startHoldLockTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), v.getStorageID())) {
try {
newReplicaInfo = v.createTemporary(b);
LOG.debug("creating temporary for block: {} on volume: {}",
@ -1949,8 +1956,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
ReplicaInfo replicaInfo = null;
ReplicaInfo finalizedReplicaInfo = null;
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
if (Thread.interrupted()) {
// Don't allow data modifications from interrupted threads
throw new IOException("Cannot finalize block from Interrupted Thread");
@ -1986,7 +1993,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir)
private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
throws IOException {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
bpid, replicaInfo.getStorageUuid())) {
// Compare generation stamp of old and new replica before finalizing
if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp()
> replicaInfo.getGenerationStamp()) {
@ -2032,8 +2040,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo)
@Override // FsDatasetSpi
public void unfinalizeBlock(ExtendedBlock b) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
b.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getLocalBlock());
if (replicaInfo != null &&
@ -2423,10 +2431,17 @@ private void cacheBlock(String bpid, long blockId) {
long length, genstamp;
Executor volumeExecutor;
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
return;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
info.getStorageUuid())) {
boolean success = false;
try {
info = volumeMap.get(bpid, blockId);
if (info == null) {
LOG.warn("Failed to cache block with id " + blockId + ", pool " +
bpid + ": ReplicaInfo not found.");
@ -2619,7 +2634,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo)
curDirScannerNotifyCount = 0;
lastDirScannerNotifyTime = startTimeMs;
}
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid,
vol.getStorageID())) {
memBlockInfo = volumeMap.get(bpid, blockId);
if (memBlockInfo != null &&
memBlockInfo.getState() != ReplicaState.FINALIZED) {
@ -2860,7 +2876,14 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
Block block, long recoveryId, long xceiverStopTimeout) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
@ -2875,7 +2898,14 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map,
lockManager) throws IOException {
while (true) {
try {
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) {
ReplicaInfo replica = map.get(bpid, block.getBlockId());
if (replica == null) {
return null;
}
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid,
replica.getStorageUuid())) {
return initReplicaRecoveryImpl(bpid, map, block, recoveryId);
}
} catch (MustStopExistingWriter e) {
@ -2888,9 +2918,6 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map,
Block block, long recoveryId)
throws IOException, MustStopExistingWriter {
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
+ ", replica=" + replica);
//check replica
if (replica == null) {
return null;
@ -2964,8 +2991,8 @@ public Replica updateReplicaUnderRecovery(
final long newBlockId,
final long newlength) throws IOException {
long startTimeMs = Time.monotonicNow();
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
oldBlock.getBlockPoolId())) {
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME,
oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) {
//get replica
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
@ -3109,6 +3136,10 @@ public void addBlockPool(String bpid, Configuration conf)
volumeExceptions.mergeException(e);
}
volumeMap.initBlockPool(bpid);
Set<String> vols = storageMap.keySet();
for (String v : vols) {
lockManager.addLock(LockLevel.VOLUME, bpid, v);
}
}
try {
volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);

View File

@ -88,6 +88,7 @@
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
@ -157,6 +158,13 @@ public void setupMocks() throws Exception {
Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager();
}
@After
public void checkDataSetLockManager() {
dataSetLockManager.lockLeakCheck();
// make sure no lock Leak.
assertNull(dataSetLockManager.getLastException());
}
/**
* Set up a mock NN with the bare minimum for a DN to register to it.
*/

View File

@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
public class ExternalVolumeImpl implements FsVolumeSpi {
private final String defaultStroageId = "test";
@Override
public FsVolumeReference obtainReference() throws ClosedChannelException {
return null;
@ -54,7 +55,7 @@ public long getAvailable() throws IOException {
@Override
public String getStorageID() {
return null;
return defaultStroageId;
}
@Override

View File

@ -80,6 +80,7 @@
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -236,6 +237,13 @@ public void setUp() throws IOException {
assertEquals(0, dataset.getNumFailedVolumes());
}
@After
public void checkDataSetLockManager() {
manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
@ -687,6 +695,7 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset);
FsVolumeImpl mockVolume = mock(FsVolumeImpl.class);
File badDir = new File(BASE_DIR, "bad");
when(mockVolume.getStorageID()).thenReturn("test");
badDir.mkdirs();
doReturn(mockVolume).when(spyDataset)
.createFsVolume(anyString(), any(StorageDirectory.class),

View File

@ -51,6 +51,7 @@
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.extdataset.ExternalVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
@ -218,7 +219,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
}
private static ReplicaInfo createReplicaInfo(Block b) {
return new FinalizedReplica(b, null, null);
return new FinalizedReplica(b, new ExternalVolumeImpl(), null);
}
private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) {
@ -318,6 +319,10 @@ public void testInitReplicaRecovery() throws IOException {
"replica.getGenerationStamp() < block.getGenerationStamp(), block=");
}
}
manager.lockLeakCheck();
// make sure no lock Leak.
assertNull(manager.getLastException());
}
/**