HDFS-14731. [FGL] Remove redundant locking on NameNode. Contributed by Konstantin V Shvachko.
This commit is contained in:
parent
ed70c115a8
commit
ecbcb058b8
@ -218,7 +218,12 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
|
|||||||
}
|
}
|
||||||
lastAppliedTxId = logLoader.getLastAppliedTxId();
|
lastAppliedTxId = logLoader.getLastAppliedTxId();
|
||||||
|
|
||||||
getNamesystem().dir.updateCountForQuota();
|
getNamesystem().writeLock();
|
||||||
|
try {
|
||||||
|
getNamesystem().dir.updateCountForQuota();
|
||||||
|
} finally {
|
||||||
|
getNamesystem().writeUnlock();
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
backupInputStream.clear();
|
backupInputStream.clear();
|
||||||
}
|
}
|
||||||
|
@ -187,11 +187,11 @@ public void pauseForTestingAfterNthCheckpoint(final String zone,
|
|||||||
final int count) throws IOException {
|
final int count) throws IOException {
|
||||||
INodesInPath iip;
|
INodesInPath iip;
|
||||||
final FSPermissionChecker pc = dir.getPermissionChecker();
|
final FSPermissionChecker pc = dir.getPermissionChecker();
|
||||||
dir.readLock();
|
dir.getFSNamesystem().readLock();
|
||||||
try {
|
try {
|
||||||
iip = dir.resolvePath(pc, zone, DirOp.READ);
|
iip = dir.resolvePath(pc, zone, DirOp.READ);
|
||||||
} finally {
|
} finally {
|
||||||
dir.readUnlock();
|
dir.getFSNamesystem().readUnlock();
|
||||||
}
|
}
|
||||||
reencryptionHandler
|
reencryptionHandler
|
||||||
.pauseForTestingAfterNthCheckpoint(iip.getLastINode().getId(), count);
|
.pauseForTestingAfterNthCheckpoint(iip.getLastINode().getId(), count);
|
||||||
@ -280,11 +280,11 @@ void stopReencryptThread() {
|
|||||||
if (getProvider() == null || reencryptionHandler == null) {
|
if (getProvider() == null || reencryptionHandler == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
dir.writeLock();
|
dir.getFSNamesystem().writeLock();
|
||||||
try {
|
try {
|
||||||
reencryptionHandler.stopThreads();
|
reencryptionHandler.stopThreads();
|
||||||
} finally {
|
} finally {
|
||||||
dir.writeUnlock();
|
dir.getFSNamesystem().writeUnlock();
|
||||||
}
|
}
|
||||||
if (reencryptHandlerExecutor != null) {
|
if (reencryptHandlerExecutor != null) {
|
||||||
reencryptHandlerExecutor.shutdownNow();
|
reencryptHandlerExecutor.shutdownNow();
|
||||||
|
@ -382,7 +382,6 @@ private static ZoneEncryptionInfoProto getZoneEncryptionInfoProto(
|
|||||||
static void saveFileXAttrsForBatch(FSDirectory fsd,
|
static void saveFileXAttrsForBatch(FSDirectory fsd,
|
||||||
List<FileEdekInfo> batch) {
|
List<FileEdekInfo> batch) {
|
||||||
assert fsd.getFSNamesystem().hasWriteLock();
|
assert fsd.getFSNamesystem().hasWriteLock();
|
||||||
assert !fsd.hasWriteLock();
|
|
||||||
if (batch != null && !batch.isEmpty()) {
|
if (batch != null && !batch.isEmpty()) {
|
||||||
for (FileEdekInfo entry : batch) {
|
for (FileEdekInfo entry : batch) {
|
||||||
final INode inode = fsd.getInode(entry.getInodeId());
|
final INode inode = fsd.getInode(entry.getInodeId());
|
||||||
@ -727,13 +726,13 @@ static String getKeyNameForZone(final FSDirectory dir,
|
|||||||
final FSPermissionChecker pc, final String zone) throws IOException {
|
final FSPermissionChecker pc, final String zone) throws IOException {
|
||||||
assert dir.getProvider() != null;
|
assert dir.getProvider() != null;
|
||||||
final INodesInPath iip;
|
final INodesInPath iip;
|
||||||
dir.readLock();
|
dir.getFSNamesystem().readLock();
|
||||||
try {
|
try {
|
||||||
iip = dir.resolvePath(pc, zone, DirOp.READ);
|
iip = dir.resolvePath(pc, zone, DirOp.READ);
|
||||||
dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), zone);
|
dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), zone);
|
||||||
return dir.ezManager.getKeyName(iip);
|
return dir.ezManager.getKeyName(iip);
|
||||||
} finally {
|
} finally {
|
||||||
dir.readUnlock();
|
dir.getFSNamesystem().readUnlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,6 @@
|
|||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ForkJoinPool;
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.RecursiveAction;
|
import java.util.concurrent.RecursiveAction;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES;
|
import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_PROTECTED_DIRECTORIES;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
|
||||||
@ -172,9 +171,6 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
|
|||||||
// Each entry in this set must be a normalized path.
|
// Each entry in this set must be a normalized path.
|
||||||
private volatile SortedSet<String> protectedDirectories;
|
private volatile SortedSet<String> protectedDirectories;
|
||||||
|
|
||||||
// lock to protect the directory and BlockMap
|
|
||||||
private final ReentrantReadWriteLock dirLock;
|
|
||||||
|
|
||||||
private final boolean isPermissionEnabled;
|
private final boolean isPermissionEnabled;
|
||||||
private final boolean isPermissionContentSummarySubAccess;
|
private final boolean isPermissionContentSummarySubAccess;
|
||||||
/**
|
/**
|
||||||
@ -215,37 +211,44 @@ public void setINodeAttributeProvider(INodeAttributeProvider provider) {
|
|||||||
attributeProvider = provider;
|
attributeProvider = provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
// utility methods to acquire and release read lock and write lock
|
/**
|
||||||
|
* The directory lock dirLock provided redundant locking.
|
||||||
|
* It has been used whenever namesystem.fsLock was used.
|
||||||
|
* dirLock is now removed and utility methods to acquire and release dirLock
|
||||||
|
* remain as placeholders only
|
||||||
|
*/
|
||||||
void readLock() {
|
void readLock() {
|
||||||
this.dirLock.readLock().lock();
|
assert namesystem.hasReadLock() : "Should hold namesystem read lock";
|
||||||
}
|
}
|
||||||
|
|
||||||
void readUnlock() {
|
void readUnlock() {
|
||||||
this.dirLock.readLock().unlock();
|
assert namesystem.hasReadLock() : "Should hold namesystem read lock";
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeLock() {
|
void writeLock() {
|
||||||
this.dirLock.writeLock().lock();
|
assert namesystem.hasWriteLock() : "Should hold namesystem write lock";
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeUnlock() {
|
void writeUnlock() {
|
||||||
this.dirLock.writeLock().unlock();
|
assert namesystem.hasWriteLock() : "Should hold namesystem write lock";
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasWriteLock() {
|
boolean hasWriteLock() {
|
||||||
return this.dirLock.isWriteLockedByCurrentThread();
|
return namesystem.hasWriteLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasReadLock() {
|
boolean hasReadLock() {
|
||||||
return this.dirLock.getReadHoldCount() > 0 || hasWriteLock();
|
return namesystem.hasReadLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated // dirLock is obsolete, use namesystem.fsLock instead
|
||||||
public int getReadHoldCount() {
|
public int getReadHoldCount() {
|
||||||
return this.dirLock.getReadHoldCount();
|
return namesystem.getReadHoldCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated // dirLock is obsolete, use namesystem.fsLock instead
|
||||||
public int getWriteHoldCount() {
|
public int getWriteHoldCount() {
|
||||||
return this.dirLock.getWriteHoldCount();
|
return namesystem.getWriteHoldCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getListLimit() {
|
public int getListLimit() {
|
||||||
@ -273,7 +276,6 @@ public enum DirOp {
|
|||||||
};
|
};
|
||||||
|
|
||||||
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
|
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
|
||||||
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
|
||||||
this.inodeId = new INodeId();
|
this.inodeId = new INodeId();
|
||||||
rootDir = createRoot(ns);
|
rootDir = createRoot(ns);
|
||||||
inodeMap = INodeMap.newInstance(rootDir);
|
inodeMap = INodeMap.newInstance(rootDir);
|
||||||
@ -1492,12 +1494,7 @@ public final void removeFromInodeMap(List<? extends INode> inodes) {
|
|||||||
* @return The inode associated with the given id
|
* @return The inode associated with the given id
|
||||||
*/
|
*/
|
||||||
public INode getInode(long id) {
|
public INode getInode(long id) {
|
||||||
readLock();
|
return inodeMap.get(id);
|
||||||
try {
|
|
||||||
return inodeMap.get(id);
|
|
||||||
} finally {
|
|
||||||
readUnlock();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -3723,6 +3723,7 @@ INodeFile getBlockCollection(BlockInfo b) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public INodeFile getBlockCollection(long id) {
|
public INodeFile getBlockCollection(long id) {
|
||||||
|
assert hasReadLock() : "Accessing INode id = " + id + " without read lock";
|
||||||
INode inode = getFSDirectory().getInode(id);
|
INode inode = getFSDirectory().getInode(id);
|
||||||
return inode == null ? null : inode.asFile();
|
return inode == null ? null : inode.asFile();
|
||||||
}
|
}
|
||||||
|
@ -338,7 +338,7 @@ public void run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
final Long zoneId;
|
final Long zoneId;
|
||||||
dir.readLock();
|
dir.getFSNamesystem().readLock();
|
||||||
try {
|
try {
|
||||||
zoneId = getReencryptionStatus().getNextUnprocessedZone();
|
zoneId = getReencryptionStatus().getNextUnprocessedZone();
|
||||||
if (zoneId == null) {
|
if (zoneId == null) {
|
||||||
@ -350,7 +350,7 @@ public void run() {
|
|||||||
getReencryptionStatus().markZoneStarted(zoneId);
|
getReencryptionStatus().markZoneStarted(zoneId);
|
||||||
resetSubmissionTracker(zoneId);
|
resetSubmissionTracker(zoneId);
|
||||||
} finally {
|
} finally {
|
||||||
dir.readUnlock();
|
dir.getFSNamesystem().readUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -570,7 +570,8 @@ public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
|
|||||||
BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
|
BlockInfo storedBlock = bm.getStoredBlock(b.getLocalBlock());
|
||||||
|
|
||||||
// The block should be replicated OK - so Reconstruction Work will be null
|
// The block should be replicated OK - so Reconstruction Work will be null
|
||||||
BlockReconstructionWork work = bm.scheduleReconstruction(storedBlock, 2);
|
BlockReconstructionWork work = scheduleReconstruction(
|
||||||
|
cluster.getNamesystem(), storedBlock, 2);
|
||||||
assertNull(work);
|
assertNull(work);
|
||||||
// Set the upgradeDomain to "3" for the 3 nodes hosting the block.
|
// Set the upgradeDomain to "3" for the 3 nodes hosting the block.
|
||||||
// Then alternately set the remaining 3 nodes to have an upgradeDomain
|
// Then alternately set the remaining 3 nodes to have an upgradeDomain
|
||||||
@ -586,7 +587,8 @@ public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Now reconWork is non-null and 2 extra targets are needed
|
// Now reconWork is non-null and 2 extra targets are needed
|
||||||
work = bm.scheduleReconstruction(storedBlock, 2);
|
work = scheduleReconstruction(
|
||||||
|
cluster.getNamesystem(), storedBlock, 2);
|
||||||
assertEquals(2, work.getAdditionalReplRequired());
|
assertEquals(2, work.getAdditionalReplRequired());
|
||||||
|
|
||||||
// Add the block to the replication queue and ensure it is replicated
|
// Add the block to the replication queue and ensure it is replicated
|
||||||
@ -598,6 +600,16 @@ public void testMultipleReplicasScheduledForUpgradeDomain() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static BlockReconstructionWork scheduleReconstruction(
|
||||||
|
FSNamesystem fsn, BlockInfo block, int priority) {
|
||||||
|
fsn.writeLock();
|
||||||
|
try {
|
||||||
|
return fsn.getBlockManager().scheduleReconstruction(block, priority);
|
||||||
|
} finally {
|
||||||
|
fsn.writeUnlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
|
public void testUnderReplicatedRespectsRacksAndUpgradeDomain()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user