HDFS-15160. ReplicaMap, Disk Balancer, Directory Scanner and various FsDatasetImpl methods should use datanode readlock. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Stephen O'Donnell 2020-06-30 07:09:26 -07:00 committed by Wei-Chiu Chuang
parent 4249c04d45
commit 2a67e2b1a0
10 changed files with 187 additions and 42 deletions

View File

@ -606,6 +606,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_LOCK_FAIR_KEY =
"dfs.datanode.lock.fair";
public static final boolean DFS_DATANODE_LOCK_FAIR_DEFAULT = true;
public static final String DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY =
"dfs.datanode.lock.read.write.enabled";
public static final Boolean DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT =
true;
public static final String DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.datanode.lock-reporting-threshold-ms";
public static final long

View File

@ -255,7 +255,7 @@ class BlockSender implements java.io.Closeable {
// the append write.
ChunkChecksum chunkChecksum = null;
final long replicaVisibleLength;
try(AutoCloseableLock lock = datanode.data.acquireDatasetLock()) {
try(AutoCloseableLock lock = datanode.data.acquireDatasetReadLock()) {
replica = getReplica(block, datanode);
replicaVisibleLength = replica.getVisibleLength();
}

View File

@ -3060,7 +3060,7 @@ void transferReplicaForPipelineRecovery(final ExtendedBlock b,
final BlockConstructionStage stage;
//get replica information
try(AutoCloseableLock lock = data.acquireDatasetLock()) {
try(AutoCloseableLock lock = data.acquireDatasetReadLock()) {
Block storedBlock = data.getStoredBlock(b.getBlockPoolId(),
b.getBlockId());
if (null == storedBlock) {

View File

@ -473,7 +473,7 @@ private void scan() {
blockPoolReport.sortBlocks();
// Hold FSDataset lock to prevent further changes to the block map
try (AutoCloseableLock lock = dataset.acquireDatasetLock()) {
try (AutoCloseableLock lock = dataset.acquireDatasetReadLock()) {
for (final String bpid : blockPoolReport.getBlockPoolIds()) {
List<ScanInfo> blockpoolReport = blockPoolReport.getScanInfo(bpid);

View File

@ -504,7 +504,7 @@ private Map<String, String> getStorageIDToVolumeBasePathMap()
Map<String, String> storageIDToVolBasePathMap = new HashMap<>();
FsDatasetSpi.FsVolumeReferences references;
try {
try(AutoCloseableLock lock = this.dataset.acquireDatasetLock()) {
try(AutoCloseableLock lock = this.dataset.acquireDatasetReadLock()) {
references = this.dataset.getFsVolumeReferences();
for (int ndx = 0; ndx < references.size(); ndx++) {
FsVolumeSpi vol = references.get(ndx);

View File

@ -657,12 +657,16 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
FsVolumeSpi destination) throws IOException;
/**
* Acquire the lock of the data set.
* Acquire the lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
*/
AutoCloseableLock acquireDatasetLock();
/***
* Acquire the read lock of the data set.
* Acquire the read lock of the data set. This prevents other threads from
* modifying the volume map structure inside the datanode, but other changes
* are still possible. For example modifying the genStamp of a block instance.
* @return The AutoClosable read lock instance.
*/
AutoCloseableLock acquireDatasetReadLock();

View File

@ -183,7 +183,7 @@ public StorageReport[] getStorageReports(String bpid)
@Override
public FsVolumeImpl getVolume(final ExtendedBlock b) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final ReplicaInfo r =
volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null ? (FsVolumeImpl) r.getVolume() : null;
@ -193,7 +193,7 @@ public FsVolumeImpl getVolume(final ExtendedBlock b) {
@Override // FsDatasetSpi
public Block getStoredBlock(String bpid, long blkid)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo r = volumeMap.get(bpid, blkid);
if (r == null) {
return null;
@ -206,7 +206,7 @@ public Block getStoredBlock(String bpid, long blkid)
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
EMPTY_SET : volumeMap.replicas(bpid));
}
@ -302,7 +302,20 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
DFSConfigKeys.DFS_DATANODE_LOCK_REPORTING_THRESHOLD_MS_DEFAULT,
TimeUnit.MILLISECONDS));
this.datasetWriteLock = new AutoCloseableLock(datasetRWLock.writeLock());
boolean enableRL = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY,
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_DEFAULT);
// The read lock can be disabled by the above config key. If it is disabled
// then we simply make the both the read and write lock variables hold
// the write lock. All accesses to the lock are via these variables, so that
// effectively disables the read lock.
if (enableRL) {
LOG.info("The datanode lock is a read write lock");
this.datasetReadLock = new AutoCloseableLock(datasetRWLock.readLock());
} else {
LOG.info("The datanode lock is an exclusive write lock");
this.datasetReadLock = this.datasetWriteLock;
}
this.datasetWriteLockCondition = datasetWriteLock.newCondition();
// The number of volumes required for operation is the total number
@ -342,7 +355,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
}
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(datasetRWLock);
volumeMap = new ReplicaMap(datasetReadLock, datasetWriteLock);
ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
@ -475,7 +488,8 @@ private void addVolume(Storage.StorageDirectory sd) throws IOException {
.setConf(this.conf)
.build();
FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(datasetRWLock);
ReplicaMap tempVolumeMap =
new ReplicaMap(datasetReadLock, datasetWriteLock);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref);
@ -810,7 +824,7 @@ public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
ReplicaInfo info;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
}
@ -898,7 +912,7 @@ ReplicaInfo getReplicaInfo(String bpid, long blkid)
@Override // FsDatasetSpi
public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b,
long blkOffset, long metaOffset) throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
ReplicaInfo info = getReplicaInfo(b);
FsVolumeReference ref = info.getVolume().obtainReference();
try {
@ -1023,7 +1037,7 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
}
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = volumes.getNextVolume(targetStorageType, targetStorageId,
block.getNumBytes());
}
@ -1137,7 +1151,7 @@ public ReplicaInfo moveBlockAcrossVolumes(ExtendedBlock block, FsVolumeSpi
FsVolumeReference volumeRef = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
volumeRef = destination.obtainReference();
}
@ -1930,7 +1944,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = null;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
curVolumes = volumes.getVolumes();
for (FsVolumeSpi v : curVolumes) {
builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
@ -1989,7 +2003,7 @@ public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
*/
@Override
public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final List<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(
volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
@ -2082,9 +2096,7 @@ private boolean isValid(final ExtendedBlock b, final ReplicaState state) {
ReplicaInfo validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final ReplicaInfo r;
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
r = volumeMap.get(bpid, blockId);
}
if (r != null) {
if (r.blockDataExists()) {
return r;
@ -2327,7 +2339,7 @@ public boolean isCached(String bpid, long blockId) {
@Override // FsDatasetSpi
public boolean contains(final ExtendedBlock block) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final long blockId = block.getLocalBlock().getBlockId();
final String bpid = block.getBlockPoolId();
final ReplicaInfo r = volumeMap.get(bpid, blockId);
@ -2655,7 +2667,7 @@ public ReplicaInfo getReplica(String bpid, long blockId) {
@Override
public String getReplicaString(String bpid, long blockId) {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica r = volumeMap.get(bpid, blockId);
return r == null ? "null" : r.toString();
}
@ -2882,7 +2894,7 @@ private ReplicaInfo updateReplicaUnderRecovery(
@Override // FsDatasetSpi
public long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = getReplicaInfo(block.getBlockPoolId(),
block.getBlockId());
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
@ -3032,12 +3044,13 @@ public void deleteBlockPool(String bpid, boolean force)
@Override // FsDatasetSpi
public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
throws IOException {
try (AutoCloseableLock lock = datasetWriteLock.acquire()) {
try (AutoCloseableLock lock = datasetReadLock.acquire()) {
final Replica replica = volumeMap.get(block.getBlockPoolId(),
block.getBlockId());
if (replica == null) {
throw new ReplicaNotFoundException(block);
}
synchronized(replica) {
if (replica.getGenerationStamp() < block.getGenerationStamp()) {
throw new IOException(
"Replica generation stamp < block generation stamp, block="
@ -3046,6 +3059,7 @@ public BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block)
block.setGenerationStamp(replica.getGenerationStamp());
}
}
}
ReplicaInfo r = getBlockReplica(block);
File blockFile = new File(r.getBlockURI());

View File

@ -33,7 +33,6 @@
* Maintains the replica map.
*/
class ReplicaMap {
private final ReadWriteLock rwLock;
// Lock object to synchronize this instance.
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
@ -53,18 +52,22 @@ public int compare(Object o1, Object o2) {
}
};
ReplicaMap(ReadWriteLock lock) {
if (lock == null) {
ReplicaMap(AutoCloseableLock readLock, AutoCloseableLock writeLock) {
if (readLock == null || writeLock == null) {
throw new HadoopIllegalArgumentException(
"Lock to synchronize on cannot be null");
}
this.rwLock = lock;
this.readLock = new AutoCloseableLock(rwLock.readLock());
this.writeLock = new AutoCloseableLock(rwLock.writeLock());
this.readLock = readLock;
this.writeLock = writeLock;
}
ReplicaMap(ReadWriteLock lock) {
this(new AutoCloseableLock(lock.readLock()),
new AutoCloseableLock(lock.writeLock()));
}
String[] getBlockPoolList() {
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
return map.keySet().toArray(new String[map.keySet().size()]);
}
}
@ -109,7 +112,7 @@ ReplicaInfo get(String bpid, Block block) {
*/
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
if (set == null) {
return null;
@ -235,7 +238,7 @@ ReplicaInfo remove(String bpid, long blockId) {
* @return the number of replicas in the map
*/
int size(String bpid) {
try (AutoCloseableLock l = writeLock.acquire()) {
try (AutoCloseableLock l = readLock.acquire()) {
FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
return set != null ? set.size() : 0;
}
@ -281,4 +284,14 @@ void cleanUpBlockPool(String bpid) {
AutoCloseableLock getLock() {
return writeLock;
}
/**
* Get the lock object used for synchronizing the ReplicasMap for read only
* operations.
* @return The read lock object
*/
AutoCloseableLock getReadLock() {
return readLock;
}
}

View File

@ -3250,6 +3250,19 @@
</description>
</property>
<property>
<name>dfs.datanode.lock.read.write.enabled</name>
<value>true</value>
<description>If this is true, the FsDataset lock will be a read write lock. If
it is false, all locks will be a write lock.
Enabling this should give better datanode throughput, as many read only
functions can run concurrently under the read lock, when they would
previously have required the exclusive write lock. As the feature is
experimental, this switch can be used to disable the shared read lock, and
cause all lock acquisitions to use the exclusive write lock.
</description>
</property>
<property>
<name>dfs.datanode.lock-reporting-threshold-ms</name>
<value>300</value>

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
@ -84,6 +85,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
@ -198,6 +200,101 @@ public void setUp() throws IOException {
assertEquals(0, dataset.getNumFailedVolumes());
}
@Test
public void testReadLockEnabledByDefault()
throws IOException, InterruptedException {
final FsDatasetSpi ds = dataset;
AtomicBoolean accessed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
sleep(10000);
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
waiterLatch.countDown();
accessed.getAndSet(true);
} catch (Exception e) {
}
}
};
holder.start();
latch.await();
waiter.start();
waiterLatch.await();
// The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock.
assertEquals(true, accessed.get());
holder.interrupt();
holder.join();
waiter.join();
}
@Test(timeout=10000)
public void testReadLockCanBeDisabledByConfig()
throws IOException, InterruptedException {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build();
try {
cluster.waitActive();
DataNode dn = cluster.getDataNodes().get(0);
final FsDatasetSpi<?> ds = DataNodeTestUtils.getFSDataset(dn);
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1);
AtomicBoolean accessed = new AtomicBoolean(false);
Thread holder = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown();
sleep(10000);
} catch (Exception e) {
}
}
};
Thread waiter = new Thread() {
public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true);
waiterLatch.countDown();
} catch (Exception e) {
}
}
};
holder.start();
latch.await();
waiter.start();
Thread.sleep(200);
// Waiting thread should not have been able to update the variable
// as the read lock is disabled and hence an exclusive lock.
assertEquals(false, accessed.get());
holder.interrupt();
holder.join();
waiterLatch.await();
// After the holder thread exits, the variable is updated.
assertEquals(true, accessed.get());
waiter.join();
} finally {
cluster.shutdown();
}
}
@Test
public void testAddVolumes() throws IOException {
final int numNewVolumes = 3;
@ -244,8 +341,8 @@ public void testAddVolumes() throws IOException {
@Test
public void testAddVolumeWithSameStorageUuid() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
HdfsConfiguration config = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(config)
.numDataNodes(1).build();
try {
cluster.waitActive();