diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index ac553970b4..e502f1cafa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -115,6 +115,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.net.DomainPeerServer; @@ -2877,7 +2878,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final BlockConstructionStage stage; //get replica information - synchronized(data) { + try(AutoCloseableLock lock = data.acquireDatasetLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 1db445e416..7c8985ec8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -583,7 +584,7 @@ private void scan() { Map diskReport = getDiskReport(); // Hold FSDataset lock to prevent further changes to the block map - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { for (Entry entry : diskReport.entrySet()) { String bpid = entry.getKey(); ScanInfo[] blockpoolReport = entry.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index c73b10fdb8..1f61861c07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -22,6 +22,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus @@ -454,7 +455,7 @@ private Map getStorageIDToVolumeMap() Map pathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { - synchronized (this.dataset) { + try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index eeab098b6f..acc269af87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -639,4 +640,9 @@ ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, */ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, FsVolumeSpi destination) throws IOException; + + /** + * Acquire the lock of the data set. + */ + AutoCloseableLock acquireDatasetLock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index c0f2fbdb7b..c4d924e8dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -175,21 +176,26 @@ public StorageReport[] getStorageReports(String bpid) } @Override - public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) { - final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); - return r != null? (FsVolumeImpl)r.getVolume(): null; + public FsVolumeImpl getVolume(final ExtendedBlock b) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + final ReplicaInfo r = + volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); + return r != null ? (FsVolumeImpl) r.getVolume() : null; + } } @Override // FsDatasetSpi - public synchronized Block getStoredBlock(String bpid, long blkid) + public Block getStoredBlock(String bpid, long blkid) throws IOException { - File blockfile = getFile(bpid, blkid, false); - if (blockfile == null) { - return null; + try (AutoCloseableLock lock = datasetLock.acquire()) { + File blockfile = getFile(bpid, blkid, false); + if (blockfile == null) { + return null; + } + final File metafile = FsDatasetUtil.findMetaFile(blockfile); + final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); + return new Block(blkid, blockfile.length(), gs); } - final File metafile = FsDatasetUtil.findMetaFile(blockfile); - final long gs = FsDatasetUtil.parseGenerationStamp(blockfile, metafile); - return new Block(blkid, blockfile.length(), gs); } @@ -258,6 +264,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private boolean blockPinningEnabled; private final int maxDataLength; + + private final AutoCloseableLock datasetLock; /** * An FSDataset has a directory where it loads its data files. @@ -269,6 +277,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) this.dataStorage = storage; this.conf = conf; this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); + this.datasetLock = new AutoCloseableLock(); // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); @@ -341,6 +350,11 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); } + @Override + public AutoCloseableLock acquireDatasetLock() { + return datasetLock.acquire(); + } + /** * Gets initial volume failure information for all volumes that failed * immediately at startup. The method works by determining the set difference @@ -375,25 +389,27 @@ private static List getInitialVolumeFailureInfos( * Activate a volume to serve requests. * @throws IOException if the storage UUID already exists. */ - private synchronized void activateVolume( + private void activateVolume( ReplicaMap replicaMap, Storage.StorageDirectory sd, StorageType storageType, FsVolumeReference ref) throws IOException { - DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); - if (dnStorage != null) { - final String errorMsg = String.format( - "Found duplicated storage UUID: %s in %s.", - sd.getStorageUuid(), sd.getVersionFile()); - LOG.error(errorMsg); - throw new IOException(errorMsg); + try (AutoCloseableLock lock = datasetLock.acquire()) { + DatanodeStorage dnStorage = storageMap.get(sd.getStorageUuid()); + if (dnStorage != null) { + final String errorMsg = String.format( + "Found duplicated storage UUID: %s in %s.", + sd.getStorageUuid(), sd.getVersionFile()); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + volumeMap.addAll(replicaMap); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + volumes.addVolume(ref); } - volumeMap.addAll(replicaMap); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), - DatanodeStorage.State.NORMAL, - storageType)); - asyncDiskService.addVolume(sd.getCurrentDir()); - volumes.addVolume(ref); } private void addVolume(Collection dataLocations, @@ -488,7 +504,7 @@ public void removeVolumes(Set volumesToRemove, boolean clearFailure) { Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final File absRoot = sd.getRoot().getAbsoluteFile(); @@ -534,7 +550,7 @@ public void removeVolumes(Set volumesToRemove, boolean clearFailure) { } } - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for(String storageUuid : storageToRemove) { storageMap.remove(storageUuid); } @@ -743,7 +759,7 @@ private File getBlockFileNoExistsCheck(ExtendedBlock b, boolean touch) throws IOException { final File f; - synchronized(this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch); } if (f == null) { @@ -809,22 +825,25 @@ private ReplicaInfo getReplicaInfo(String bpid, long blkid) * Returns handles to the block file and its metadata file */ @Override // FsDatasetSpi - public synchronized ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, + public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - ReplicaInfo info = getReplicaInfo(b); - FsVolumeReference ref = info.getVolume().obtainReference(); - try { - InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); + try (AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo info = getReplicaInfo(b); + FsVolumeReference ref = info.getVolume().obtainReference(); try { - InputStream metaInStream = openAndSeek(info.getMetaFile(), metaOffset); - return new ReplicaInputStreams(blockInStream, metaInStream, ref); + InputStream blockInStream = openAndSeek(info.getBlockFile(), blkOffset); + try { + InputStream metaInStream = + openAndSeek(info.getMetaFile(), metaOffset); + return new ReplicaInputStreams(blockInStream, metaInStream, ref); + } catch (IOException e) { + IOUtils.cleanup(null, blockInStream); + throw e; + } } catch (IOException e) { - IOUtils.cleanup(null, blockInStream); + IOUtils.cleanup(null, ref); throw e; } - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; } } @@ -943,7 +962,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, } FsVolumeReference volumeRef = null; - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, block.getNumBytes()); } try { @@ -985,7 +1004,7 @@ private ReplicaInfo moveBlock(ExtendedBlock block, ReplicaInfo replicaInfo, newReplicaInfo.setNumBytes(blockFiles[1].length()); // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); @@ -1015,7 +1034,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi FsVolumeReference volumeRef = null; - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { volumeRef = destination.obtainReference(); } @@ -1143,41 +1162,44 @@ static private void truncateBlock(File blockFile, File metaFile, @Override // FsDatasetSpi - public synchronized ReplicaHandler append(ExtendedBlock b, + public ReplicaHandler append(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - // If the block was successfully finalized because all packets - // were successfully processed at the Datanode but the ack for - // some of the packets were not received by the client. The client - // re-opens the connection and retries sending those packets. - // The other reason is that an "append" is occurring to this block. - - // check the validity of the parameter - if (newGS < b.getGenerationStamp()) { - throw new IOException("The new generation stamp " + newGS + - " should be greater than the replica " + b + "'s generation stamp"); - } - ReplicaInfo replicaInfo = getReplicaInfo(b); - LOG.info("Appending to " + replicaInfo); - if (replicaInfo.getState() != ReplicaState.FINALIZED) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.UNFINALIZED_REPLICA + b); - } - if (replicaInfo.getNumBytes() != expectedBlockLen) { - throw new IOException("Corrupted replica " + replicaInfo + - " with a length of " + replicaInfo.getNumBytes() + - " expected length is " + expectedBlockLen); - } + try (AutoCloseableLock lock = datasetLock.acquire()) { + // If the block was successfully finalized because all packets + // were successfully processed at the Datanode but the ack for + // some of the packets were not received by the client. The client + // re-opens the connection and retries sending those packets. + // The other reason is that an "append" is occurring to this block. - FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); - ReplicaBeingWritten replica = null; - try { - replica = append(b.getBlockPoolId(), (FinalizedReplica)replicaInfo, newGS, - b.getNumBytes()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; + // check the validity of the parameter + if (newGS < b.getGenerationStamp()) { + throw new IOException("The new generation stamp " + newGS + + " should be greater than the replica " + b + "'s generation stamp"); + } + ReplicaInfo replicaInfo = getReplicaInfo(b); + LOG.info("Appending to " + replicaInfo); + if (replicaInfo.getState() != ReplicaState.FINALIZED) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNFINALIZED_REPLICA + b); + } + if (replicaInfo.getNumBytes() != expectedBlockLen) { + throw new IOException("Corrupted replica " + replicaInfo + + " with a length of " + replicaInfo.getNumBytes() + + " expected length is " + expectedBlockLen); + } + + FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); + ReplicaBeingWritten replica = null; + try { + replica = append(b.getBlockPoolId(), + (FinalizedReplica) replicaInfo, newGS, + b.getNumBytes()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(replica, ref); } - return new ReplicaHandler(replica, ref); } /** Append to a finalized replica @@ -1192,66 +1214,68 @@ public synchronized ReplicaHandler append(ExtendedBlock b, * @throws IOException if moving the replica from finalized directory * to rbw directory fails */ - private synchronized ReplicaBeingWritten append(String bpid, + private ReplicaBeingWritten append(String bpid, FinalizedReplica replicaInfo, long newGS, long estimateBlockLen) throws IOException { - // If the block is cached, start uncaching it. - cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); + try (AutoCloseableLock lock = datasetLock.acquire()) { + // If the block is cached, start uncaching it. + cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId()); - // If there are any hardlinks to the block, break them. This ensures we are - // not appending to a file that is part of a previous/ directory. - replicaInfo.breakHardLinksIfNeeded(); - - // construct a RBW replica with the new GS - File blkfile = replicaInfo.getBlockFile(); - FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); - long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); - if (v.getAvailable() < bytesReserved) { - throw new DiskOutOfSpaceException("Insufficient space for appending to " - + replicaInfo); - } - File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); - File oldmeta = replicaInfo.getMetaFile(); - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( - replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, - v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); - File newmeta = newReplicaInfo.getMetaFile(); + // If there are any hardlinks to the block, break them. This ensures we + // are not appending to a file that is part of a previous/ directory. + replicaInfo.breakHardLinksIfNeeded(); - // rename meta file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + oldmeta + " to " + newmeta); - } - try { - NativeIO.renameTo(oldmeta, newmeta); - } catch (IOException e) { - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move meta file " + oldmeta + - " to rbw dir " + newmeta, e); - } - - // rename block file to rbw directory - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + blkfile + " to " + newBlkFile - + ", file length=" + blkfile.length()); - } - try { - NativeIO.renameTo(blkfile, newBlkFile); - } catch (IOException e) { - try { - NativeIO.renameTo(newmeta, oldmeta); - } catch (IOException ex) { - LOG.warn("Cannot move meta file " + newmeta + - "back to the finalized directory " + oldmeta, ex); + // construct a RBW replica with the new GS + File blkfile = replicaInfo.getBlockFile(); + FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); + long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes(); + if (v.getAvailable() < bytesReserved) { + throw new DiskOutOfSpaceException("Insufficient space for appending to " + + replicaInfo); } - throw new IOException("Block " + replicaInfo + " reopen failed. " + - " Unable to move block file " + blkfile + - " to rbw dir " + newBlkFile, e); + File newBlkFile = new File(v.getRbwDir(bpid), replicaInfo.getBlockName()); + File oldmeta = replicaInfo.getMetaFile(); + ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten( + replicaInfo.getBlockId(), replicaInfo.getNumBytes(), newGS, + v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved); + File newmeta = newReplicaInfo.getMetaFile(); + + // rename meta file to rbw directory + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + oldmeta + " to " + newmeta); + } + try { + NativeIO.renameTo(oldmeta, newmeta); + } catch (IOException e) { + throw new IOException("Block " + replicaInfo + " reopen failed. " + + " Unable to move meta file " + oldmeta + + " to rbw dir " + newmeta, e); + } + + // rename block file to rbw directory + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + blkfile + " to " + newBlkFile + + ", file length=" + blkfile.length()); + } + try { + NativeIO.renameTo(blkfile, newBlkFile); + } catch (IOException e) { + try { + NativeIO.renameTo(newmeta, oldmeta); + } catch (IOException ex) { + LOG.warn("Cannot move meta file " + newmeta + + "back to the finalized directory " + oldmeta, ex); + } + throw new IOException("Block " + replicaInfo + " reopen failed. " + + " Unable to move block file " + blkfile + + " to rbw dir " + newBlkFile, e); + } + + // Replace finalized replica by a RBW replica in replicas map + volumeMap.add(bpid, newReplicaInfo); + v.reserveSpaceForReplica(bytesReserved); + return newReplicaInfo; } - - // Replace finalized replica by a RBW replica in replicas map - volumeMap.add(bpid, newReplicaInfo); - v.reserveSpaceForReplica(bytesReserved); - return newReplicaInfo; } private static class MustStopExistingWriter extends Exception { @@ -1321,7 +1345,7 @@ public ReplicaHandler recoverAppend( while (true) { try { - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); FsVolumeReference ref = replicaInfo.getVolume().obtainReference(); @@ -1353,7 +1377,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, LOG.info("Recover failed close " + b); while (true) { try { - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { // check replica's state ReplicaInfo replicaInfo = recoverCheck(b, newGS, expectedBlockLen); // bump the replica's GS @@ -1400,62 +1424,65 @@ private void bumpReplicaGS(ReplicaInfo replicaInfo, } @Override // FsDatasetSpi - public synchronized ReplicaHandler createRbw( + public ReplicaHandler createRbw( StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), - b.getBlockId()); - if (replicaInfo != null) { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); - } - // create a new block - FsVolumeReference ref = null; + try (AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), + b.getBlockId()); + if (replicaInfo != null) { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + replicaInfo.getState() + + " and thus cannot be created."); + } + // create a new block + FsVolumeReference ref = null; - // Use ramdisk only if block size is a multiple of OS page size. - // This simplifies reservation for partially used replicas - // significantly. - if (allowLazyPersist && - lazyWriter != null && - b.getNumBytes() % cacheManager.getOsPageSize() == 0 && - reserveLockedMemory(b.getNumBytes())) { - try { - // First try to place the block on a transient volume. - ref = volumes.getNextTransientVolume(b.getNumBytes()); - datanode.getMetrics().incrRamDiskBlocksWrite(); - } catch(DiskOutOfSpaceException de) { - // Ignore the exception since we just fall back to persistent storage. - } finally { - if (ref == null) { - cacheManager.release(b.getNumBytes()); + // Use ramdisk only if block size is a multiple of OS page size. + // This simplifies reservation for partially used replicas + // significantly. + if (allowLazyPersist && + lazyWriter != null && + b.getNumBytes() % cacheManager.getOsPageSize() == 0 && + reserveLockedMemory(b.getNumBytes())) { + try { + // First try to place the block on a transient volume. + ref = volumes.getNextTransientVolume(b.getNumBytes()); + datanode.getMetrics().incrRamDiskBlocksWrite(); + } catch (DiskOutOfSpaceException de) { + // Ignore the exception since we just fall back to persistent storage. + } finally { + if (ref == null) { + cacheManager.release(b.getNumBytes()); + } } } + + if (ref == null) { + ref = volumes.getNextVolume(storageType, b.getNumBytes()); + } + + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + // create an rbw file to hold block in the designated volume + + if (allowLazyPersist && !v.isTransientStorage()) { + datanode.getMetrics().incrRamDiskBlocksWriteFallback(); + } + + File f; + try { + f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + + ReplicaBeingWritten newReplicaInfo = + new ReplicaBeingWritten(b.getBlockId(), + b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return new ReplicaHandler(newReplicaInfo, ref); } - - if (ref == null) { - ref = volumes.getNextVolume(storageType, b.getNumBytes()); - } - - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - // create an rbw file to hold block in the designated volume - - if (allowLazyPersist && !v.isTransientStorage()) { - datanode.getMetrics().incrRamDiskBlocksWriteFallback(); - } - - File f; - try { - f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; - } - - ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes()); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return new ReplicaHandler(newReplicaInfo, ref); } @Override // FsDatasetSpi @@ -1466,7 +1493,7 @@ public ReplicaHandler recoverRbw( while (true) { try { - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId()); // check the replica's state @@ -1487,61 +1514,64 @@ public ReplicaHandler recoverRbw( } } - private synchronized ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, + private ReplicaHandler recoverRbwImpl(ReplicaBeingWritten rbw, ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - // check generation stamp - long replicaGenerationStamp = rbw.getGenerationStamp(); - if (replicaGenerationStamp < b.getGenerationStamp() || - replicaGenerationStamp > newGS) { - throw new ReplicaNotFoundException( - ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + - ". Expected GS range is [" + b.getGenerationStamp() + ", " + - newGS + "]."); - } - - // check replica length - long bytesAcked = rbw.getBytesAcked(); - long numBytes = rbw.getNumBytes(); - if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd){ - throw new ReplicaNotFoundException("Unmatched length replica " + - rbw + ": BytesAcked = " + bytesAcked + - " BytesRcvd = " + numBytes + " are not in the range of [" + - minBytesRcvd + ", " + maxBytesRcvd + "]."); - } - - FsVolumeReference ref = rbw.getVolume().obtainReference(); - try { - // Truncate the potentially corrupt portion. - // If the source was client and the last node in the pipeline was lost, - // any corrupt data written after the acked length can go unnoticed. - if (numBytes > bytesAcked) { - final File replicafile = rbw.getBlockFile(); - truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); - rbw.setNumBytes(bytesAcked); - rbw.setLastChecksumAndDataLen(bytesAcked, null); + try (AutoCloseableLock lock = datasetLock.acquire()) { + // check generation stamp + long replicaGenerationStamp = rbw.getGenerationStamp(); + if (replicaGenerationStamp < b.getGenerationStamp() || + replicaGenerationStamp > newGS) { + throw new ReplicaNotFoundException( + ReplicaNotFoundException.UNEXPECTED_GS_REPLICA + b + + ". Expected GS range is [" + b.getGenerationStamp() + ", " + + newGS + "]."); } - // bump the replica's generation stamp to newGS - bumpReplicaGS(rbw, newGS); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; + // check replica length + long bytesAcked = rbw.getBytesAcked(); + long numBytes = rbw.getNumBytes(); + if (bytesAcked < minBytesRcvd || numBytes > maxBytesRcvd) { + throw new ReplicaNotFoundException("Unmatched length replica " + + rbw + ": BytesAcked = " + bytesAcked + + " BytesRcvd = " + numBytes + " are not in the range of [" + + minBytesRcvd + ", " + maxBytesRcvd + "]."); + } + + FsVolumeReference ref = rbw.getVolume().obtainReference(); + try { + // Truncate the potentially corrupt portion. + // If the source was client and the last node in the pipeline was lost, + // any corrupt data written after the acked length can go unnoticed. + if (numBytes > bytesAcked) { + final File replicafile = rbw.getBlockFile(); + truncateBlock(replicafile, rbw.getMetaFile(), numBytes, bytesAcked); + rbw.setNumBytes(bytesAcked); + rbw.setLastChecksumAndDataLen(bytesAcked, null); + } + + // bump the replica's generation stamp to newGS + bumpReplicaGS(rbw, newGS); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + return new ReplicaHandler(rbw, ref); } - return new ReplicaHandler(rbw, ref); } @Override // FsDatasetSpi - public synchronized ReplicaInPipeline convertTemporaryToRbw( + public ReplicaInPipeline convertTemporaryToRbw( final ExtendedBlock b) throws IOException { - final long blockId = b.getBlockId(); - final long expectedGs = b.getGenerationStamp(); - final long visible = b.getNumBytes(); - LOG.info("Convert " + b + " from Temporary to RBW, visible length=" - + visible); + try (AutoCloseableLock lock = datasetLock.acquire()) { + final long blockId = b.getBlockId(); + final long expectedGs = b.getGenerationStamp(); + final long visible = b.getNumBytes(); + LOG.info("Convert " + b + " from Temporary to RBW, visible length=" + + visible); + + final ReplicaInPipeline temp; - final ReplicaInPipeline temp; - { // get replica final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), blockId); if (r == null) { @@ -1553,43 +1583,44 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw( throw new ReplicaAlreadyExistsException( "r.getState() != ReplicaState.TEMPORARY, r=" + r); } - temp = (ReplicaInPipeline)r; - } - // check generation stamp - if (temp.getGenerationStamp() != expectedGs) { - throw new ReplicaAlreadyExistsException( - "temp.getGenerationStamp() != expectedGs = " + expectedGs - + ", temp=" + temp); - } + temp = (ReplicaInPipeline) r; - // TODO: check writer? - // set writer to the current thread - // temp.setWriter(Thread.currentThread()); + // check generation stamp + if (temp.getGenerationStamp() != expectedGs) { + throw new ReplicaAlreadyExistsException( + "temp.getGenerationStamp() != expectedGs = " + expectedGs + + ", temp=" + temp); + } - // check length - final long numBytes = temp.getNumBytes(); - if (numBytes < visible) { - throw new IOException(numBytes + " = numBytes < visible = " - + visible + ", temp=" + temp); + // TODO: check writer? + // set writer to the current thread + // temp.setWriter(Thread.currentThread()); + + // check length + final long numBytes = temp.getNumBytes(); + if (numBytes < visible) { + throw new IOException(numBytes + " = numBytes < visible = " + + visible + ", temp=" + temp); + } + // check volume + final FsVolumeImpl v = (FsVolumeImpl) temp.getVolume(); + if (v == null) { + throw new IOException("r.getVolume() = null, temp=" + temp); + } + + // move block files to the rbw directory + BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); + final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), + bpslice.getRbwDir()); + // create RBW + final ReplicaBeingWritten rbw = new ReplicaBeingWritten( + blockId, numBytes, expectedGs, + v, dest.getParentFile(), Thread.currentThread(), 0); + rbw.setBytesAcked(visible); + // overwrite the RBW in the volume map + volumeMap.add(b.getBlockPoolId(), rbw); + return rbw; } - // check volume - final FsVolumeImpl v = (FsVolumeImpl)temp.getVolume(); - if (v == null) { - throw new IOException("r.getVolume() = null, temp=" + temp); - } - - // move block files to the rbw directory - BlockPoolSlice bpslice = v.getBlockPoolSlice(b.getBlockPoolId()); - final File dest = moveBlockFiles(b.getLocalBlock(), temp.getBlockFile(), - bpslice.getRbwDir()); - // create RBW - final ReplicaBeingWritten rbw = new ReplicaBeingWritten( - blockId, numBytes, expectedGs, - v, dest.getParentFile(), Thread.currentThread(), 0); - rbw.setBytesAcked(visible); - // overwrite the RBW in the volume map - volumeMap.add(b.getBlockPoolId(), rbw); - return rbw; } @Override // FsDatasetSpi @@ -1599,7 +1630,7 @@ public ReplicaHandler createTemporary( long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); ReplicaInfo lastFoundReplicaInfo = null; do { - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo currentReplicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (currentReplicaInfo == lastFoundReplicaInfo) { @@ -1678,72 +1709,82 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea * Complete the block write! */ @Override // FsDatasetSpi - public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { - if (Thread.interrupted()) { - // Don't allow data modifications from interrupted threads - throw new IOException("Cannot finalize block from Interrupted Thread"); + public void finalizeBlock(ExtendedBlock b) throws IOException { + try (AutoCloseableLock lock = datasetLock.acquire()) { + if (Thread.interrupted()) { + // Don't allow data modifications from interrupted threads + throw new IOException("Cannot finalize block from Interrupted Thread"); + } + ReplicaInfo replicaInfo = getReplicaInfo(b); + if (replicaInfo.getState() == ReplicaState.FINALIZED) { + // this is legal, when recovery happens on a file that has + // been opened for append but never modified + return; + } + finalizeReplica(b.getBlockPoolId(), replicaInfo); } - ReplicaInfo replicaInfo = getReplicaInfo(b); - if (replicaInfo.getState() == ReplicaState.FINALIZED) { - // this is legal, when recovery happens on a file that has - // been opened for append but never modified - return; - } - finalizeReplica(b.getBlockPoolId(), replicaInfo); } - private synchronized FinalizedReplica finalizeReplica(String bpid, + private FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { - FinalizedReplica newReplicaInfo = null; - if (replicaInfo.getState() == ReplicaState.RUR && - ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == - ReplicaState.FINALIZED) { - newReplicaInfo = (FinalizedReplica) - ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); - } else { - FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); - File f = replicaInfo.getBlockFile(); - if (v == null) { - throw new IOException("No volume for temporary file " + f + - " for block " + replicaInfo); - } + try (AutoCloseableLock lock = datasetLock.acquire()) { + FinalizedReplica newReplicaInfo = null; + if (replicaInfo.getState() == ReplicaState.RUR && + ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica().getState() + == ReplicaState.FINALIZED) { + newReplicaInfo = (FinalizedReplica) + ((ReplicaUnderRecovery) replicaInfo).getOriginalReplica(); + } else { + FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume(); + File f = replicaInfo.getBlockFile(); + if (v == null) { + throw new IOException("No volume for temporary file " + f + + " for block " + replicaInfo); + } - File dest = v.addFinalizedBlock( - bpid, replicaInfo, f, replicaInfo.getBytesReserved()); - newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); + File dest = v.addFinalizedBlock( + bpid, replicaInfo, f, replicaInfo.getBytesReserved()); + newReplicaInfo = + new FinalizedReplica(replicaInfo, v, dest.getParentFile()); - if (v.isTransientStorage()) { - releaseLockedMemory( - replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(), - false); - ramDiskReplicaTracker.addReplica( - bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); - datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); + if (v.isTransientStorage()) { + releaseLockedMemory( + replicaInfo.getOriginalBytesReserved() + - replicaInfo.getNumBytes(), + false); + ramDiskReplicaTracker.addReplica( + bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes()); + datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes()); + } } + volumeMap.add(bpid, newReplicaInfo); + + return newReplicaInfo; } - volumeMap.add(bpid, newReplicaInfo); - - return newReplicaInfo; } /** * Remove the temporary block file (if any) */ @Override // FsDatasetSpi - public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), - b.getLocalBlock()); - if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { - // remove from volumeMap - volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); + public void unfinalizeBlock(ExtendedBlock b) throws IOException { + try (AutoCloseableLock lock = datasetLock.acquire()) { + ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), + b.getLocalBlock()); + if (replicaInfo != null + && replicaInfo.getState() == ReplicaState.TEMPORARY) { + // remove from volumeMap + volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - // delete the on-disk temp file - if (delBlockFromDisk(replicaInfo.getBlockFile(), - replicaInfo.getMetaFile(), b.getLocalBlock())) { - LOG.warn("Block " + b + " unfinalized and removed. " ); - } - if (replicaInfo.getVolume().isTransientStorage()) { - ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true); + // delete the on-disk temp file + if (delBlockFromDisk(replicaInfo.getBlockFile(), + replicaInfo.getMetaFile(), b.getLocalBlock())) { + LOG.warn("Block " + b + " unfinalized and removed. "); + } + if (replicaInfo.getVolume().isTransientStorage()) { + ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), + b.getBlockId(), true); + } } } } @@ -1791,7 +1832,7 @@ public Map getBlockReports(String bpid) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); } - synchronized(this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { for (ReplicaInfo b : volumeMap.replicas(bpid)) { switch(b.getState()) { case FINALIZED: @@ -1824,31 +1865,36 @@ public Map getBlockReports(String bpid) { * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public synchronized List getFinalizedBlocks(String bpid) { - ArrayList finalized = - new ArrayList(volumeMap.size(bpid)); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if(b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica)b)); + public List getFinalizedBlocks(String bpid) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + ArrayList finalized = + new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + if (b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica) b)); + } } + return finalized; } - return finalized; } /** * Get the list of finalized blocks from in-memory blockmap for a block pool. */ @Override - public synchronized List getFinalizedBlocksOnPersistentStorage(String bpid) { - ArrayList finalized = - new ArrayList(volumeMap.size(bpid)); - for (ReplicaInfo b : volumeMap.replicas(bpid)) { - if(!b.getVolume().isTransientStorage() && - b.getState() == ReplicaState.FINALIZED) { - finalized.add(new FinalizedReplica((FinalizedReplica)b)); + public List getFinalizedBlocksOnPersistentStorage( + String bpid) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + ArrayList finalized = + new ArrayList(volumeMap.size(bpid)); + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + if (!b.getVolume().isTransientStorage() && + b.getState() == ReplicaState.FINALIZED) { + finalized.add(new FinalizedReplica((FinalizedReplica) b)); + } } + return finalized; } - return finalized; } /** @@ -1924,7 +1970,7 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) { File validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final File f; - synchronized(this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { f = getFile(bpid, blockId, false); } @@ -1973,7 +2019,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { for (int i = 0; i < invalidBlks.length; i++) { final File f; final FsVolumeImpl v; - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final ReplicaInfo info = volumeMap.get(bpid, invalidBlks[i]); if (info == null) { // It is okay if the block is not found -- it may be deleted earlier. @@ -2084,7 +2130,7 @@ private void cacheBlock(String bpid, long blockId) { long length, genstamp; Executor volumeExecutor; - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo info = volumeMap.get(bpid, blockId); boolean success = false; try { @@ -2151,9 +2197,11 @@ public boolean isCached(String bpid, long blockId) { } @Override // FsDatasetSpi - public synchronized boolean contains(final ExtendedBlock block) { - final long blockId = block.getLocalBlock().getBlockId(); - return getFile(block.getBlockPoolId(), blockId, false) != null; + public boolean contains(final ExtendedBlock block) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + final long blockId = block.getLocalBlock().getBlockId(); + return getFile(block.getBlockPoolId(), blockId, false) != null; + } } /** @@ -2279,7 +2327,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, File diskMetaFile, FsVolumeSpi vol) throws IOException { Block corruptBlock = null; ReplicaInfo memBlockInfo; - synchronized (this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { memBlockInfo = volumeMap.get(bpid, blockId); if (memBlockInfo != null && memBlockInfo.getState() != ReplicaState.FINALIZED) { // Block is not finalized - ignore the difference @@ -2435,9 +2483,11 @@ public ReplicaInfo getReplica(String bpid, long blockId) { } @Override - public synchronized String getReplicaString(String bpid, long blockId) { - final Replica r = volumeMap.get(bpid, blockId); - return r == null? "null": r.toString(); + public String getReplicaString(String bpid, long blockId) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + final Replica r = volumeMap.get(bpid, blockId); + return r == null ? "null" : r.toString(); + } } @Override // FsDatasetSpi @@ -2530,67 +2580,69 @@ static ReplicaRecoveryInfo initReplicaRecoveryImpl(String bpid, ReplicaMap map, } @Override // FsDatasetSpi - public synchronized Replica updateReplicaUnderRecovery( + public Replica updateReplicaUnderRecovery( final ExtendedBlock oldBlock, final long recoveryId, final long newBlockId, final long newlength) throws IOException { - //get replica - final String bpid = oldBlock.getBlockPoolId(); - final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); - LOG.info("updateReplica: " + oldBlock - + ", recoveryId=" + recoveryId - + ", length=" + newlength - + ", replica=" + replica); - - //check replica - if (replica == null) { - throw new ReplicaNotFoundException(oldBlock); - } - - //check replica state - if (replica.getState() != ReplicaState.RUR) { - throw new IOException("replica.getState() != " + ReplicaState.RUR + try (AutoCloseableLock lock = datasetLock.acquire()) { + //get replica + final String bpid = oldBlock.getBlockPoolId(); + final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId()); + LOG.info("updateReplica: " + oldBlock + + ", recoveryId=" + recoveryId + + ", length=" + newlength + ", replica=" + replica); + + //check replica + if (replica == null) { + throw new ReplicaNotFoundException(oldBlock); + } + + //check replica state + if (replica.getState() != ReplicaState.RUR) { + throw new IOException("replica.getState() != " + ReplicaState.RUR + + ", replica=" + replica); + } + + //check replica's byte on disk + if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) { + throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" + + " replica.getBytesOnDisk() != block.getNumBytes(), block=" + + oldBlock + ", replica=" + replica); + } + + //check replica files before update + checkReplicaFiles(replica); + + //update replica + final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock + .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, + newBlockId, newlength); + + boolean copyTruncate = newBlockId != oldBlock.getBlockId(); + if (!copyTruncate) { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == recoveryId + && finalized.getNumBytes() == newlength + : "Replica information mismatched: oldBlock=" + oldBlock + + ", recoveryId=" + recoveryId + ", newlength=" + newlength + + ", newBlockId=" + newBlockId + ", finalized=" + finalized; + } else { + assert finalized.getBlockId() == oldBlock.getBlockId() + && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() + && finalized.getNumBytes() == oldBlock.getNumBytes() + : "Finalized and old information mismatched: oldBlock=" + oldBlock + + ", genStamp=" + oldBlock.getGenerationStamp() + + ", len=" + oldBlock.getNumBytes() + + ", finalized=" + finalized; + } + + //check replica files after update + checkReplicaFiles(finalized); + + return finalized; } - - //check replica's byte on disk - if (replica.getBytesOnDisk() != oldBlock.getNumBytes()) { - throw new IOException("THIS IS NOT SUPPOSED TO HAPPEN:" - + " replica.getBytesOnDisk() != block.getNumBytes(), block=" - + oldBlock + ", replica=" + replica); - } - - //check replica files before update - checkReplicaFiles(replica); - - //update replica - final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock - .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, - newBlockId, newlength); - - boolean copyTruncate = newBlockId != oldBlock.getBlockId(); - if(!copyTruncate) { - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == recoveryId - && finalized.getNumBytes() == newlength - : "Replica information mismatched: oldBlock=" + oldBlock - + ", recoveryId=" + recoveryId + ", newlength=" + newlength - + ", newBlockId=" + newBlockId + ", finalized=" + finalized; - } else { - assert finalized.getBlockId() == oldBlock.getBlockId() - && finalized.getGenerationStamp() == oldBlock.getGenerationStamp() - && finalized.getNumBytes() == oldBlock.getNumBytes() - : "Finalized and old information mismatched: oldBlock=" + oldBlock - + ", genStamp=" + oldBlock.getGenerationStamp() - + ", len=" + oldBlock.getNumBytes() - + ", finalized=" + finalized; - } - - //check replica files after update - checkReplicaFiles(finalized); - - return finalized; } private FinalizedReplica updateReplicaUnderRecovery( @@ -2668,23 +2720,25 @@ private File[] copyReplicaWithNewBlockIdAndGS( } @Override // FsDatasetSpi - public synchronized long getReplicaVisibleLength(final ExtendedBlock block) + public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - final Replica replica = getReplicaInfo(block.getBlockPoolId(), - block.getBlockId()); - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "replica.getGenerationStamp() < block.getGenerationStamp(), block=" - + block + ", replica=" + replica); + try (AutoCloseableLock lock = datasetLock.acquire()) { + final Replica replica = getReplicaInfo(block.getBlockPoolId(), + block.getBlockId()); + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "replica.getGenerationStamp() < block.getGenerationStamp(), block=" + + block + ", replica=" + replica); + } + return replica.getVisibleLength(); } - return replica.getVisibleLength(); } @Override public void addBlockPool(String bpid, Configuration conf) throws IOException { LOG.info("Adding block pool " + bpid); - synchronized(this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); } @@ -2692,11 +2746,14 @@ public void addBlockPool(String bpid, Configuration conf) } @Override - public synchronized void shutdownBlockPool(String bpid) { - LOG.info("Removing block pool " + bpid); - Map blocksPerVolume = getBlockReports(bpid); - volumeMap.cleanUpBlockPool(bpid); - volumes.removeBlockPool(bpid, blocksPerVolume); + public void shutdownBlockPool(String bpid) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + LOG.info("Removing block pool " + bpid); + Map blocksPerVolume + = getBlockReports(bpid); + volumeMap.cleanUpBlockPool(bpid); + volumes.removeBlockPool(bpid, blocksPerVolume); + } } /** @@ -2759,35 +2816,38 @@ public Map getVolumeInfoMap() { } @Override //FsDatasetSpi - public synchronized void deleteBlockPool(String bpid, boolean force) + public void deleteBlockPool(String bpid, boolean force) throws IOException { - List curVolumes = volumes.getVolumes(); - if (!force) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + List curVolumes = volumes.getVolumes(); + if (!force) { + for (FsVolumeImpl volume : curVolumes) { + try (FsVolumeReference ref = volume.obtainReference()) { + if (!volume.isBPDirEmpty(bpid)) { + LOG.warn(bpid + + " has some block files, cannot delete unless forced"); + throw new IOException("Cannot delete block pool, " + + "it contains some block files"); + } + } catch (ClosedChannelException e) { + // ignore. + } + } + } for (FsVolumeImpl volume : curVolumes) { try (FsVolumeReference ref = volume.obtainReference()) { - if (!volume.isBPDirEmpty(bpid)) { - LOG.warn(bpid + " has some block files, cannot delete unless forced"); - throw new IOException("Cannot delete block pool, " - + "it contains some block files"); - } + volume.deleteBPDirectories(bpid, force); } catch (ClosedChannelException e) { // ignore. } } } - for (FsVolumeImpl volume : curVolumes) { - try (FsVolumeReference ref = volume.obtainReference()) { - volume.deleteBPDirectories(bpid, force); - } catch (ClosedChannelException e) { - // ignore. - } - } } @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - synchronized(this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { @@ -2838,7 +2898,7 @@ public void clearRollingUpgradeMarker(String bpid) throws IOException { @Override public void onCompleteLazyPersist(String bpId, long blockId, long creationTime, File[] savedFiles, FsVolumeImpl targetVolume) { - synchronized (FsDatasetImpl.this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { ramDiskReplicaTracker.recordEndLazyPersist(bpId, blockId, savedFiles); targetVolume.incDfsUsedAndNumBlocks(bpId, savedFiles[0].length() @@ -2972,7 +3032,7 @@ private boolean saveNextReplica() { try { block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block != null) { - synchronized (FsDatasetImpl.this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); // If replicaInfo is null, the block was either deleted before @@ -3042,7 +3102,7 @@ public void evictBlocks(long bytesNeeded) throws IOException { long blockFileUsed, metaFileUsed; final String bpid = replicaState.getBlockPoolId(); - synchronized (FsDatasetImpl.this) { + try (AutoCloseableLock lock = datasetLock.acquire()) { replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); @@ -3219,14 +3279,17 @@ public void setTimer(Timer newTimer) { this.timer = newTimer; } - synchronized void stopAllDataxceiverThreads(FsVolumeImpl volume) { - for (String blockPoolId : volumeMap.getBlockPoolList()) { - Collection replicas = volumeMap.replicas(blockPoolId); - for (ReplicaInfo replicaInfo : replicas) { - if (replicaInfo instanceof ReplicaInPipeline - && replicaInfo.getVolume().equals(volume)) { - ReplicaInPipeline replicaInPipeline = (ReplicaInPipeline) replicaInfo; - replicaInPipeline.interruptThread(); + void stopAllDataxceiverThreads(FsVolumeImpl volume) { + try (AutoCloseableLock lock = datasetLock.acquire()) { + for (String blockPoolId : volumeMap.getBlockPoolList()) { + Collection replicas = volumeMap.replicas(blockPoolId); + for (ReplicaInfo replicaInfo : replicas) { + if (replicaInfo instanceof ReplicaInPipeline + && replicaInfo.getVolume().equals(volume)) { + ReplicaInPipeline replicaInPipeline + = (ReplicaInPipeline) replicaInfo; + replicaInPipeline.interruptThread(); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 4a446d4e95..59c9ed4633 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -304,7 +305,7 @@ void onMetaFileDeletion(String bpid, long value) { private void decDfsUsedAndNumBlocks(String bpid, long value, boolean blockFileDeleted) { - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.decDfsUsed(value); @@ -316,7 +317,7 @@ private void decDfsUsedAndNumBlocks(String bpid, long value, } void incDfsUsedAndNumBlocks(String bpid, long value) { - synchronized (dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.incDfsUsed(value); @@ -326,7 +327,7 @@ void incDfsUsedAndNumBlocks(String bpid, long value) { } void incDfsUsed(String bpid, long value) { - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { BlockPoolSlice bp = bpSlices.get(bpid); if (bp != null) { bp.incDfsUsed(value); @@ -337,7 +338,7 @@ void incDfsUsed(String bpid, long value) { @VisibleForTesting public long getDfsUsed() throws IOException { long dfsUsed = 0; - synchronized(dataset) { + try(AutoCloseableLock lock = dataset.acquireDatasetLock()) { for(BlockPoolSlice s : bpSlices.values()) { dfsUsed += s.getDfsUsed(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 05652608d2..c0c76ad7fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -39,6 +39,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -115,6 +116,9 @@ public static byte simulatedByte(Block b, long offsetInBlk) { DatanodeStorage.State.NORMAL; static final byte[] nullCrcFileData; + + private final AutoCloseableLock datasetLock; + static { DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.Type.NULL, 16*1024 ); @@ -550,6 +554,7 @@ public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); this.volume = new SimulatedVolume(this.storage); + this.datasetLock = new AutoCloseableLock(); } public synchronized void injectBlocks(String bpid, @@ -1366,5 +1371,9 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, return null; } + @Override + public AutoCloseableLock acquireDatasetLock() { + return datasetLock.acquire(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 42e80fa8f6..8183de8408 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -725,7 +726,7 @@ public void run() { final RecoveringBlock recoveringBlock = new RecoveringBlock( block.getBlock(), locations, block.getBlock() .getGenerationStamp() + 1); - synchronized (dataNode.data) { + try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { Thread.sleep(2000); dataNode.initReplicaRecovery(recoveringBlock); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 48d468156d..3822bad735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -113,7 +114,7 @@ private List createFile(String fileNamePrefix, /** Truncate a block file */ private long truncateBlockFile() throws IOException { - synchronized (fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); @@ -138,7 +139,7 @@ private long truncateBlockFile() throws IOException { /** Delete a block file */ private long deleteBlockFile() { - synchronized(fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File f = b.getBlockFile(); File mf = b.getMetaFile(); @@ -154,7 +155,7 @@ private long deleteBlockFile() { /** Delete block meta file */ private long deleteMetaFile() { - synchronized(fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) { File file = b.getMetaFile(); // Delete a metadata file @@ -173,7 +174,7 @@ private long deleteMetaFile() { * @throws IOException */ private void duplicateBlock(long blockId) throws IOException { - synchronized (fds) { + try(AutoCloseableLock lock = fds.acquireDatasetLock()) { ReplicaInfo b = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 8518ddd2a9..b8564bb2c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -450,4 +451,8 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, return null; } + @Override + public AutoCloseableLock acquireDatasetLock() { + return null; + } }