diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 3a0a6782ba..9de33ff60a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -606,6 +606,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_LOCK_FAIR_KEY = "dfs.datanode.lock.fair"; public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true; + public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY = + "dfs.datanode.lock.read.write.enabled"; + public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT = + true; public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY = "dfs.datanode.lock-reporting-threshold-ms"; public static final long diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 6102a592c2..b396bf9705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -255,7 +255,7 @@ class BlockSender implements java.io.Closeable { // the append write. ChunkChecksum chunkChecksum = null; final long replicaVisibleLength; - try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) { + try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) { replica = getReplica(block, datanode); replicaVisibleLength = replica.getVisibleLength(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 2e498e4750..e242cc826d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3060,7 +3060,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b, final BlockConstructionStage stage; //get replica information - try(AutoCloseableLock lock = data.acquireDatasetLock()) { + try(AutoCloseableLock lock = data.acquireDatasetReadLock()) { Block storedBlock = data.getStoredBlock(b.getBlockPoolId(), b.getBlockId()); if (null == storedBlock) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 35625ce121..b2e521c695 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -473,7 +473,7 @@ private void scan() { blockPoolReport.sortBlocks(); // Hold FSDataset lock to prevent further changes to the block map - try (AutoCloseableLock lock = dataset.acquireDatasetLock()) { + try (AutoCloseableLock lock = dataset.acquireDatasetReadLock()) { for (final String bpid : blockPoolReport.getBlockPoolIds()) { List blockpoolReport = blockPoolReport.getScanInfo(bpid); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index 958c0cfeeb..ac10e8f249 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -504,7 +504,7 @@ private Map getStorageIDToVolumeBasePathMap() Map storageIDToVolBasePathMap = new HashMap<>(); FsDatasetSpi.FsVolumeReferences references; try { - try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) { + try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 2e5135d841..177c62e017 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -657,12 +657,16 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, FsVolumeSpi destination) throws IOException; /** - * Acquire the lock of the data set. + * Acquire the lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. */ AutoCloseableLock acquireDatasetLock(); /*** - * Acquire the read lock of the data set. + * Acquire the read lock of the data set. This prevents other threads from + * modifying the volume map structure inside the datanode, but other changes + * are still possible. For example modifying the genStamp of a block instance. * @return The AutoClosable read lock instance. */ AutoCloseableLock acquireDatasetReadLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a083012a2c..de898e9343 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -183,7 +183,7 @@ public StorageReport[] getStorageReports(String bpid) @Override public FsVolumeImpl getVolume(final ExtendedBlock b) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); return r != null ? (FsVolumeImpl) r.getVolume() : null; @@ -193,7 +193,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) { @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { ReplicaInfo r = volumeMap.get(bpid, blkid); if (r == null) { return null; @@ -206,7 +206,7 @@ public Block getStoredBlock(String bpid, long blkid) public Set deepCopyReplica(String bpid) throws IOException { Set replicas = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections. EMPTY_SET : volumeMap.replicas(bpid)); } @@ -302,7 +302,20 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT, TimeUnit.MILLISECONDS)); this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock()); - this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + boolean enableRL = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT); + // The read lock can be disabled by the above config key. If it is disabled + // then we simply make the both the read and write lock variables hold + // the write lock. All accesses to the lock are via these variables, so that + // effectively disables the read lock. + if (enableRL) { + LOG.info("The datanode lock is a read write lock"); + this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock()); + } else { + LOG.info("The datanode lock is an exclusive write lock"); + this.datasetReadLock = this.datasetWriteLock; + } this.datasetWriteLockCondition = datasetWriteLock.newCondition(); // The number of volumes required for operation is the total number @@ -342,7 +355,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) } storageMap = new ConcurrentHashMap(); - volumeMap = new ReplicaMap(datasetRWLock); + volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock); ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this); @SuppressWarnings("unchecked") @@ -475,7 +488,8 @@ private void addVolume(Storage.StorageDirectory sd) throws IOException { .setConf(this.conf) .build(); FsVolumeReference ref = fsVolume.obtainReference(); - ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock); + ReplicaMap tempVolumeMap = + new ReplicaMap(datasetReadLock, datasetWriteLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); @@ -810,7 +824,7 @@ public InputStream getBlockInputStream(ExtendedBlock b, long seekOffset) throws IOException { ReplicaInfo info; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock()); } @@ -898,7 +912,7 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid) @Override // FsDatasetSpi public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkOffset, long metaOffset) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { ReplicaInfo info = getReplicaInfo(b); FsVolumeReference ref = info.getVolume().obtainReference(); try { @@ -1023,7 +1037,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, } FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId, block.getNumBytes()); } @@ -1137,7 +1151,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi FsVolumeReference volumeRef = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { volumeRef = destination.obtainReference(); } @@ -1930,7 +1944,7 @@ public Map getBlockReports(String bpid) { new HashMap(); List curVolumes = null; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { curVolumes = volumes.getVolumes(); for (FsVolumeSpi v : curVolumes) { builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength)); @@ -1989,7 +2003,7 @@ public Map getBlockReports(String bpid) { */ @Override public List getFinalizedBlocks(String bpid) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final List finalized = new ArrayList( volumeMap.size(bpid)); for (ReplicaInfo b : volumeMap.replicas(bpid)) { @@ -2082,9 +2096,7 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) { ReplicaInfo validateBlockFile(String bpid, long blockId) { //Should we check for metadata file too? final ReplicaInfo r; - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { - r = volumeMap.get(bpid, blockId); - } + r = volumeMap.get(bpid, blockId); if (r != null) { if (r.blockDataExists()) { return r; @@ -2327,7 +2339,7 @@ public boolean isCached(String bpid, long blockId) { @Override // FsDatasetSpi public boolean contains(final ExtendedBlock block) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final long blockId = block.getLocalBlock().getBlockId(); final String bpid = block.getBlockPoolId(); final ReplicaInfo r = volumeMap.get(bpid, blockId); @@ -2655,7 +2667,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) { @Override public String getReplicaString(String bpid, long blockId) { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica r = volumeMap.get(bpid, blockId); return r == null ? "null" : r.toString(); } @@ -2882,7 +2894,7 @@ private ReplicaInfo updateReplicaUnderRecovery( @Override // FsDatasetSpi public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = getReplicaInfo(block.getBlockPoolId(), block.getBlockId()); if (replica.getGenerationStamp() < block.getGenerationStamp()) { @@ -3032,18 +3044,20 @@ public void deleteBlockPool(String bpid, boolean force) @Override // FsDatasetSpi public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block) throws IOException { - try (AutoCloseableLock lock = datasetWriteLock.acquire()) { + try (AutoCloseableLock lock = datasetReadLock.acquire()) { final Replica replica = volumeMap.get(block.getBlockPoolId(), block.getBlockId()); if (replica == null) { throw new ReplicaNotFoundException(block); } - if (replica.getGenerationStamp() < block.getGenerationStamp()) { - throw new IOException( - "Replica generation stamp < block generation stamp, block=" - + block + ", replica=" + replica); - } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { - block.setGenerationStamp(replica.getGenerationStamp()); + synchronized(replica) { + if (replica.getGenerationStamp() < block.getGenerationStamp()) { + throw new IOException( + "Replica generation stamp < block generation stamp, block=" + + block + ", replica=" + replica); + } else if (replica.getGenerationStamp() > block.getGenerationStamp()) { + block.setGenerationStamp(replica.getGenerationStamp()); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java index df14f2aad0..5dfcc77174 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java @@ -33,7 +33,6 @@ * Maintains the replica map. */ class ReplicaMap { - private final ReadWriteLock rwLock; // Lock object to synchronize this instance. private final AutoCloseableLock readLock; private final AutoCloseableLock writeLock; @@ -53,18 +52,22 @@ public int compare(Object o1, Object o2) { } }; - ReplicaMap(ReadWriteLock lock) { - if (lock == null) { + ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) { + if (readLock == null || writeLock == null) { throw new HadoopIllegalArgumentException( "Lock to synchronize on cannot be null"); } - this.rwLock = lock; - this.readLock = new AutoCloseableLock(rwLock.readLock()); - this.writeLock = new AutoCloseableLock(rwLock.writeLock()); + this.readLock = readLock; + this.writeLock = writeLock; + } + + ReplicaMap(ReadWriteLock lock) { + this(new AutoCloseableLock(lock.readLock()), + new AutoCloseableLock(lock.writeLock())); } String[] getBlockPoolList() { - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { return map.keySet().toArray(new String[map.keySet().size()]); } } @@ -109,7 +112,7 @@ ReplicaInfo get(String bpid, Block block) { */ ReplicaInfo get(String bpid, long blockId) { checkBlockPool(bpid); - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { FoldedTreeSet set = map.get(bpid); if (set == null) { return null; @@ -235,7 +238,7 @@ ReplicaInfo remove(String bpid, long blockId) { * @return the number of replicas in the map */ int size(String bpid) { - try (AutoCloseableLock l = writeLock.acquire()) { + try (AutoCloseableLock l = readLock.acquire()) { FoldedTreeSet set = map.get(bpid); return set != null ? set.size() : 0; } @@ -281,4 +284,14 @@ void cleanUpBlockPool(String bpid) { AutoCloseableLock getLock() { return writeLock; } + + /** + * Get the lock object used for synchronizing the ReplicasMap for read only + * operations. + * @return The read lock object + */ + AutoCloseableLock getReadLock() { + return readLock; + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1e8490addb..689ecfe2f3 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3250,6 +3250,19 @@ + + dfs.datanode.lock.read.write.enabled + true + If this is true, the FsDataset lock will be a read write lock. If + it is false, all locks will be a write lock. + Enabling this should give better datanode throughput, as many read only + functions can run concurrently under the read lock, when they would + previously have required the exclusive write lock. As the feature is + experimental, this switch can be used to disable the shared read lock, and + cause all lock acquisitions to use the exclusive write lock. + + + dfs.datanode.lock-reporting-threshold-ms 300 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 273feb0491..8b445c5a51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -84,6 +85,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; @@ -198,6 +200,101 @@ public void setUp() throws IOException { assertEquals(0, dataset.getNumFailedVolumes()); } + @Test + public void testReadLockEnabledByDefault() + throws IOException, InterruptedException { + final FsDatasetSpi ds = dataset; + AtomicBoolean accessed = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch waiterLatch = new CountDownLatch(1); + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + sleep(10000); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + waiterLatch.countDown(); + accessed.getAndSet(true); + } catch (Exception e) { + } + } + }; + + holder.start(); + latch.await(); + waiter.start(); + waiterLatch.await(); + // The holder thread is still holding the lock, but the waiter can still + // run as the lock is a shared read lock. + assertEquals(true, accessed.get()); + holder.interrupt(); + holder.join(); + waiter.join(); + } + + @Test(timeout=10000) + public void testReadLockCanBeDisabledByConfig() + throws IOException, InterruptedException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean( + DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + try { + cluster.waitActive(); + DataNode dn = cluster.getDataNodes().get(0); + final FsDatasetSpi ds = DataNodeTestUtils.getFSDataset(dn); + + CountDownLatch latch = new CountDownLatch(1); + CountDownLatch waiterLatch = new CountDownLatch(1); + AtomicBoolean accessed = new AtomicBoolean(false); + + Thread holder = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + latch.countDown(); + sleep(10000); + } catch (Exception e) { + } + } + }; + + Thread waiter = new Thread() { + public void run() { + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + accessed.getAndSet(true); + waiterLatch.countDown(); + } catch (Exception e) { + } + } + }; + + holder.start(); + latch.await(); + waiter.start(); + Thread.sleep(200); + // Waiting thread should not have been able to update the variable + // as the read lock is disabled and hence an exclusive lock. + assertEquals(false, accessed.get()); + holder.interrupt(); + holder.join(); + waiterLatch.await(); + // After the holder thread exits, the variable is updated. + assertEquals(true, accessed.get()); + waiter.join(); + } finally { + cluster.shutdown(); + } + } + @Test public void testAddVolumes() throws IOException { final int numNewVolumes = 3; @@ -244,8 +341,8 @@ public void testAddVolumes() throws IOException { @Test public void testAddVolumeWithSameStorageUuid() throws IOException { - HdfsConfiguration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + HdfsConfiguration config = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) .numDataNodes(1).build(); try { cluster.waitActive();