diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index aaf37aa09c..df24f9890d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -432,6 +432,9 @@ private synchronized void activateVolume( ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { + for (String bp : volumeMap.getBlockPoolList()) { + lockManager.addLock(LockLevel.VOLUME, bp, ref.getVolume().getStorageID()); + } DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); if (dnStorage != null) { final String errorMsg = String.format( @@ -629,6 +632,9 @@ public void removeVolumes( synchronized (this) { for (String storageUuid : storageToRemove) { storageMap.remove(storageUuid); + for (String bp : volumeMap.getBlockPoolList()) { + lockManager.removeLock(LockLevel.VOLUME, bp, storageUuid); + } } } } @@ -906,8 +912,8 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid) @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseDataSetLock l = lockManager.readLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1372,8 +1378,8 @@ static void computeChecksum(ReplicaInfo srcReplica, File dstMeta, @Override // FsDatasetSpi public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { // If the block was successfully finalized because all packets // were successfully processed at the Datanode but the ack for // some of the packets were not received by the client. The client @@ -1425,7 +1431,8 @@ public ReplicaHandler append(ExtendedBlock b, private ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo, long newGS, long estimateBlockLen) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + bpid, replicaInfo.getStorageUuid())) { // If the block is cached, start uncaching it. if (replicaInfo.getState() != ReplicaState.FINALIZED) { throw new IOException("Only a Finalized replica can be appended to; " @@ -1554,8 +1561,8 @@ public Replica recoverClose(ExtendedBlock b, long newGS, LOG.info("Recover failed close " + b); while (true) { try { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1578,7 +1585,7 @@ public ReplicaHandler createRbw( StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, + try (AutoCloseableLock lock = lockManager.readLock(LockLevel.BLOCK_POOl, b.getBlockPoolId())) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); @@ -1626,20 +1633,20 @@ public ReplicaHandler createRbw( } ReplicaInPipeline newReplicaInfo; - try { + try (AutoCloseableLock l = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), v.getStorageID())) { newReplicaInfo = v.createRbw(b); if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) { throw new IOException("CreateRBW returned a replica of state " + newReplicaInfo.getReplicaInfo().getState() + " for block " + b.getBlockId()); } + volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); + return new ReplicaHandler(newReplicaInfo, ref); } catch (IOException e) { IOUtils.cleanupWithLogger(null, ref); throw e; } - - volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); - return new ReplicaHandler(newReplicaInfo, ref); } finally { if (dataNodeMetrics != null) { long createRbwMs = Time.monotonicNow() - startTimeMs; @@ -1657,8 +1664,8 @@ public ReplicaHandler recoverRbw( try { while (true) { try { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1689,8 +1696,8 @@ public ReplicaHandler recoverRbw( private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { // check generation stamp long replicaGenerationStamp = rbw.getGenerationStamp(); if (replicaGenerationStamp < b.getGenerationStamp() || @@ -1751,8 +1758,8 @@ private ReplicaHandler recoverRbwImpl(ReplicaInPipeline rbw, public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { final long blockId = b.getBlockId(); final long expectedGs = b.getGenerationStamp(); final long visible = b.getNumBytes(); @@ -1887,12 +1894,12 @@ public ReplicaHandler createTemporary(StorageType storageType, false); } long startHoldLockTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { - FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b - .getNumBytes()); - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - ReplicaInPipeline newReplicaInfo; + FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b + .getNumBytes()); + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + ReplicaInPipeline newReplicaInfo; + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), v.getStorageID())) { try { newReplicaInfo = v.createTemporary(b); LOG.debug("creating temporary for block: {} on volume: {}", @@ -1949,8 +1956,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) ReplicaInfo replicaInfo = null; ReplicaInfo finalizedReplicaInfo = null; long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { if (Thread.interrupted()) { // Don't allow data modifications from interrupted threads throw new IOException("Cannot finalize block from Interrupted Thread"); @@ -1986,7 +1993,8 @@ public void finalizeBlock(ExtendedBlock b, boolean fsyncDir) private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + bpid, replicaInfo.getStorageUuid())) { // Compare generation stamp of old and new replica before finalizing if (volumeMap.get(bpid, replicaInfo.getBlockId()).getGenerationStamp() > replicaInfo.getGenerationStamp()) { @@ -2032,8 +2040,8 @@ private ReplicaInfo finalizeReplica(String bpid, ReplicaInfo replicaInfo) @Override // FsDatasetSpi public void unfinalizeBlock(ExtendedBlock b) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - b.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + b.getBlockPoolId(), getReplicaInfo(b).getStorageUuid())) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); if (replicaInfo != null && @@ -2423,10 +2431,17 @@ private void cacheBlock(String bpid, long blockId) { long length, genstamp; Executor volumeExecutor; - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { - ReplicaInfo info = volumeMap.get(bpid, blockId); + ReplicaInfo info = volumeMap.get(bpid, blockId); + if (info == null) { + LOG.warn("Failed to cache block with id " + blockId + ", pool " + + bpid + ": ReplicaInfo not found."); + return; + } + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, + info.getStorageUuid())) { boolean success = false; try { + info = volumeMap.get(bpid, blockId); if (info == null) { LOG.warn("Failed to cache block with id " + blockId + ", pool " + bpid + ": ReplicaInfo not found."); @@ -2619,7 +2634,8 @@ public void checkAndUpdate(String bpid, ScanInfo scanInfo) curDirScannerNotifyCount = 0; lastDirScannerNotifyTime = startTimeMs; } - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, bpid, + vol.getStorageID())) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { @@ -2860,7 +2876,14 @@ ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, Block block, long recoveryId, long xceiverStopTimeout) throws IOException { while (true) { try { - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + ReplicaInfo replica = map.get(bpid, block.getBlockId()); + if (replica == null) { + return null; + } + LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + + ", replica=" + replica); + try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid, + replica.getStorageUuid())) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { @@ -2875,7 +2898,14 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid, ReplicaMap map, lockManager) throws IOException { while (true) { try { - try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.BLOCK_POOl, bpid)) { + ReplicaInfo replica = map.get(bpid, block.getBlockId()); + if (replica == null) { + return null; + } + LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId + + ", replica=" + replica); + try (AutoCloseDataSetLock l = lockManager.writeLock(LockLevel.VOLUME, bpid, + replica.getStorageUuid())) { return initReplicaRecoveryImpl(bpid, map, block, recoveryId); } } catch (MustStopExistingWriter e) { @@ -2888,9 +2918,6 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map, Block block, long recoveryId) throws IOException, MustStopExistingWriter { final ReplicaInfo replica = map.get(bpid, block.getBlockId()); - LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId - + ", replica=" + replica); - //check replica if (replica == null) { return null; @@ -2964,8 +2991,8 @@ public Replica updateReplicaUnderRecovery( final long newBlockId, final long newlength) throws IOException { long startTimeMs = Time.monotonicNow(); - try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl, - oldBlock.getBlockPoolId())) { + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, + oldBlock.getBlockPoolId(), getReplicaInfo(oldBlock).getStorageUuid())) { //get replica final String bpid = oldBlock.getBlockPoolId(); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); @@ -3109,6 +3136,10 @@ public void addBlockPool(String bpid, Configuration conf) volumeExceptions.mergeException(e); } volumeMap.initBlockPool(bpid); + Set vols = storageMap.keySet(); + for (String v : vols) { + lockManager.addLock(LockLevel.VOLUME, bpid, v); + } } try { volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 0c0fe61817..d300eac4b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -88,6 +88,7 @@ import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Time; import org.junit.Before; +import org.junit.After; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -157,6 +158,13 @@ public void setupMocks() throws Exception { Mockito.doReturn(dataSetLockManager).when(mockDn).getDataSetLockManager(); } + @After + public void checkDataSetLockManager() { + dataSetLockManager.lockLeakCheck(); + // make sure no lock Leak. + assertNull(dataSetLockManager.getLastException()); + } + /** * Set up a mock NN with the bare minimum for a DN to register to it. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 6c8e828f36..81cfa3e08b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; public class ExternalVolumeImpl implements FsVolumeSpi { + private final String defaultStroageId = "test"; @Override public FsVolumeReference obtainReference() throws ClosedChannelException { return null; @@ -54,7 +55,7 @@ public long getAvailable() throws IOException { @Override public String getStorageID() { - return null; + return defaultStroageId; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index f250eea292..1e4de75148 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -80,6 +80,7 @@ import org.apache.hadoop.util.StringUtils; import org.junit.Assert; import org.junit.Before; +import org.junit.After; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -236,6 +237,13 @@ public void setUp() throws IOException { assertEquals(0, dataset.getNumFailedVolumes()); } + @After + public void checkDataSetLockManager() { + manager.lockLeakCheck(); + // make sure no lock Leak. + assertNull(manager.getLastException()); + } + @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -687,6 +695,7 @@ public void testAddVolumeFailureReleasesInUseLock() throws IOException { FsDatasetImpl spyDataset = spy(dataset); FsVolumeImpl mockVolume = mock(FsVolumeImpl.class); File badDir = new File(BASE_DIR, "bad"); + when(mockVolume.getStorageID()).thenReturn("test"); badDir.mkdirs(); doReturn(mockVolume).when(spyDataset) .createFsVolume(anyString(), any(StorageDirectory.class), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java index 5290337e2f..659d53eda9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery; +import org.apache.hadoop.hdfs.server.datanode.extdataset.ExternalVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; @@ -218,7 +219,7 @@ private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception { } private static ReplicaInfo createReplicaInfo(Block b) { - return new FinalizedReplica(b, null, null); + return new FinalizedReplica(b, new ExternalVolumeImpl(), null); } private static void assertEquals(ReplicaInfo originalInfo, ReplicaRecoveryInfo recoveryInfo) { @@ -318,6 +319,10 @@ public void testInitReplicaRecovery() throws IOException { "replica.getGenerationStamp() < block.getGenerationStamp(), block="); } } + + manager.lockLeakCheck(); + // make sure no lock Leak. + assertNull(manager.getLastException()); } /**