HDFS-16434. Add opname to read/write lock for remaining operations (#3915)
This commit is contained in:
parent
ffa0eab488
commit
565e848d88
@ -383,7 +383,7 @@ protected void logUpdateMasterKey(DelegationKey key)
|
||||
namesystem.logUpdateMasterKey(key);
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
namesystem.readUnlock("logUpdateMasterKey");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
// AbstractDelegationTokenManager may crash if an exception is thrown.
|
||||
@ -412,7 +412,7 @@ protected void logExpireToken(final DelegationTokenIdentifier dtId)
|
||||
namesystem.logExpireDelegationToken(dtId);
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
namesystem.readUnlock("logExpireToken");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
// AbstractDelegationTokenManager may crash if an exception is thrown.
|
||||
|
@ -2017,7 +2017,7 @@ int computeBlockReconstructionWork(int blocksToProcess) {
|
||||
blocksToReconstruct = neededReconstruction
|
||||
.chooseLowRedundancyBlocks(blocksToProcess, reset);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("computeBlockReconstructionWork");
|
||||
}
|
||||
return computeReconstructionWorkForBlocks(blocksToReconstruct);
|
||||
}
|
||||
@ -2051,7 +2051,7 @@ int computeReconstructionWorkForBlocks(
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
|
||||
}
|
||||
|
||||
// Step 2: choose target nodes for each reconstruction task
|
||||
@ -2092,7 +2092,7 @@ int computeReconstructionWorkForBlocks(
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
|
||||
}
|
||||
|
||||
if (blockLog.isDebugEnabled()) {
|
||||
@ -2577,7 +2577,7 @@ void processPendingReconstructions() {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processPendingReconstructions");
|
||||
}
|
||||
/* If we know the target datanodes where the replication timedout,
|
||||
* we could invoke decBlocksScheduled() on it. Its ok for now.
|
||||
@ -2826,7 +2826,7 @@ public boolean processReport(final DatanodeID nodeID,
|
||||
storageInfo.receivedBlockReport();
|
||||
} finally {
|
||||
endTime = Time.monotonicNow();
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processReport");
|
||||
}
|
||||
|
||||
if(blockLog.isDebugEnabled()) {
|
||||
@ -2870,7 +2870,7 @@ public void removeBRLeaseIfNeeded(final DatanodeID nodeID,
|
||||
context.getTotalRpcs(), Long.toHexString(context.getReportId()));
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("removeBRLeaseIfNeeded");
|
||||
}
|
||||
}
|
||||
|
||||
@ -2908,7 +2908,7 @@ void rescanPostponedMisreplicatedBlocks() {
|
||||
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
|
||||
rescannedMisreplicatedBlocks.clear();
|
||||
long endSize = postponedMisreplicatedBlocks.size();
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("rescanPostponedMisreplicatedBlocks");
|
||||
LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
|
||||
" msecs. {} blocks are left. {} blocks were removed.",
|
||||
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
|
||||
@ -3775,7 +3775,7 @@ private void processMisReplicatesAsync() throws InterruptedException {
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processMisReplicatesAsync");
|
||||
// Make sure it is out of the write lock for sufficiently long time.
|
||||
Thread.sleep(sleepDuration);
|
||||
}
|
||||
@ -3830,7 +3830,7 @@ public int processMisReplicatedBlocks(List<BlockInfo> blocks) {
|
||||
"Re-scanned block {}, result is {}", blk, r);
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processMisReplicatedBlocks");
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
@ -4553,7 +4553,7 @@ void processExtraRedundancyBlocksOnInService(
|
||||
// testPlacementWithLocalRackNodesDecommissioned, it is not protected by
|
||||
// lock, only when called by DatanodeManager.refreshNodes have writeLock
|
||||
if (namesystem.hasWriteLock()) {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processExtraRedundancyBlocksOnInService");
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
@ -4685,7 +4685,7 @@ private void updateNeededReconstructions(final BlockInfo block,
|
||||
repl.outOfServiceReplicas(), oldExpectedReplicas);
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("updateNeededReconstructions");
|
||||
}
|
||||
}
|
||||
|
||||
@ -4742,7 +4742,7 @@ private int invalidateWorkForOneNode(DatanodeInfo dn) {
|
||||
return 0;
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("invalidateWorkForOneNode");
|
||||
}
|
||||
blockLog.debug("BLOCK* {}: ask {} to delete {}", getClass().getSimpleName(),
|
||||
dn, toInvalidate);
|
||||
@ -4974,7 +4974,7 @@ private void remove(long time) {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("markedDeleteBlockScrubberThread");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5092,7 +5092,7 @@ int computeDatanodeWork() {
|
||||
this.updateState();
|
||||
this.scheduledReplicationBlocksCount = workFound;
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("computeDatanodeWork");
|
||||
}
|
||||
workFound += this.computeInvalidateWork(nodesToProcess);
|
||||
return workFound;
|
||||
@ -5332,7 +5332,7 @@ private void processQueue() {
|
||||
action = queue.poll();
|
||||
} while (action != null);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processQueue");
|
||||
metrics.addBlockOpsBatched(processed - 1);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -670,7 +670,7 @@ public void run() {
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("leaveSafeMode");
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -302,7 +302,7 @@ private void rescan() throws InterruptedException {
|
||||
rescanCachedBlockMap();
|
||||
blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("cacheReplicationMonitorRescan");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -215,7 +215,7 @@ public void run() {
|
||||
|
||||
processPendingNodes();
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("DatanodeAdminMonitorV2Thread");
|
||||
}
|
||||
// After processing the above, various parts of the check() method will
|
||||
// take and drop the read / write lock as needed. Aside from the
|
||||
@ -345,12 +345,12 @@ private void processMaintenanceNodes() {
|
||||
// which added the node to the cancelled list. Therefore expired
|
||||
// maintenance nodes do not need to be added to the toRemove list.
|
||||
dnAdmin.stopMaintenance(dn);
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processMaintenanceNodes");
|
||||
namesystem.writeLock();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processMaintenanceNodes");
|
||||
}
|
||||
}
|
||||
|
||||
@ -409,7 +409,7 @@ private void processCompletedNodes(List<DatanodeDescriptor> toRemove) {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processCompletedNodes");
|
||||
}
|
||||
}
|
||||
|
||||
@ -531,7 +531,7 @@ private void moveBlocksToPending() {
|
||||
// replication
|
||||
if (blocksProcessed >= blocksPerLock) {
|
||||
blocksProcessed = 0;
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("moveBlocksToPending");
|
||||
namesystem.writeLock();
|
||||
}
|
||||
blocksProcessed++;
|
||||
@ -553,7 +553,7 @@ private void moveBlocksToPending() {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("moveBlocksToPending");
|
||||
}
|
||||
LOG.debug("{} blocks are now pending replication", pendingCount);
|
||||
}
|
||||
@ -637,7 +637,7 @@ private void scanDatanodeStorage(DatanodeDescriptor dn,
|
||||
try {
|
||||
storage = dn.getStorageInfos();
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
namesystem.readUnlock("scanDatanodeStorage");
|
||||
}
|
||||
|
||||
for (DatanodeStorageInfo s : storage) {
|
||||
@ -667,7 +667,7 @@ private void scanDatanodeStorage(DatanodeDescriptor dn,
|
||||
numBlocksChecked++;
|
||||
}
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
namesystem.readUnlock("scanDatanodeStorage");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -722,7 +722,7 @@ private void processPendingReplication() {
|
||||
suspectBlocks.getOutOfServiceBlockCount());
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processPendingReplication");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ public void run() {
|
||||
LOG.warn("DatanodeAdminMonitor caught exception when processing node.",
|
||||
e);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("DatanodeAdminMonitorThread");
|
||||
}
|
||||
if (numBlocksChecked + numNodesChecked > 0) {
|
||||
LOG.info("Checked {} blocks and {} nodes this tick. {} nodes are now " +
|
||||
@ -373,7 +373,7 @@ private void processBlocksInternal(
|
||||
// lock.
|
||||
// Yielding is required in case of block number is greater than the
|
||||
// configured per-iteration-limit.
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("processBlocksInternal");
|
||||
try {
|
||||
LOG.debug("Yielded lock during decommission/maintenance check");
|
||||
Thread.sleep(0, 500);
|
||||
|
@ -852,7 +852,7 @@ public void removeDatanode(final DatanodeID node)
|
||||
+ node + " does not exist");
|
||||
}
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("removeDatanode");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1296,7 +1296,7 @@ public void refreshNodes(final Configuration conf) throws IOException {
|
||||
refreshDatanodes();
|
||||
countSoftwareVersions();
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("refreshNodes");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -503,7 +503,7 @@ void heartbeatCheck() {
|
||||
try {
|
||||
dm.removeDeadDatanode(dead, !dead.isMaintenance());
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("removeDeadDatanode");
|
||||
}
|
||||
}
|
||||
if (failedStorage != null) {
|
||||
@ -512,7 +512,7 @@ void heartbeatCheck() {
|
||||
try {
|
||||
blockManager.removeBlocksAssociatedTo(failedStorage);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("removeBlocksAssociatedTo");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -222,7 +222,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
|
||||
try {
|
||||
getNamesystem().dir.updateCountForQuota();
|
||||
} finally {
|
||||
getNamesystem().writeUnlock();
|
||||
getNamesystem().writeUnlock("applyEdits");
|
||||
}
|
||||
} finally {
|
||||
backupInputStream.clear();
|
||||
|
@ -250,7 +250,7 @@ void doCheckpoint() throws IOException {
|
||||
sig.mostRecentCheckpointTxId);
|
||||
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
|
||||
} finally {
|
||||
backupNode.namesystem.writeUnlock();
|
||||
backupNode.namesystem.writeUnlock("doCheckpointByBackupNode");
|
||||
}
|
||||
}
|
||||
rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
|
||||
|
@ -192,7 +192,7 @@ public void pauseForTestingAfterNthCheckpoint(final String zone,
|
||||
try {
|
||||
iip = dir.resolvePath(pc, zone, DirOp.READ);
|
||||
} finally {
|
||||
dir.getFSNamesystem().readUnlock();
|
||||
dir.getFSNamesystem().readUnlock("pauseForTestingAfterNthCheckpoint");
|
||||
}
|
||||
reencryptionHandler
|
||||
.pauseForTestingAfterNthCheckpoint(iip.getLastINode().getId(), count);
|
||||
@ -224,7 +224,7 @@ public ZoneReencryptionStatus getZoneStatus(final String zone)
|
||||
return getReencryptionStatus().getZoneStatus(inode.getId());
|
||||
} finally {
|
||||
dir.readUnlock();
|
||||
dir.getFSNamesystem().readUnlock();
|
||||
dir.getFSNamesystem().readUnlock("getZoneStatus");
|
||||
}
|
||||
}
|
||||
|
||||
@ -285,7 +285,7 @@ void stopReencryptThread() {
|
||||
try {
|
||||
reencryptionHandler.stopThreads();
|
||||
} finally {
|
||||
dir.getFSNamesystem().writeUnlock();
|
||||
dir.getFSNamesystem().writeUnlock("stopReencryptThread");
|
||||
}
|
||||
if (reencryptHandlerExecutor != null) {
|
||||
reencryptHandlerExecutor.shutdownNow();
|
||||
|
@ -656,7 +656,7 @@ static EncryptionKeyInfo getEncryptionKeyInfo(FSNamesystem fsn,
|
||||
Preconditions.checkNotNull(ezKeyName);
|
||||
|
||||
// Generate EDEK while not holding the fsn lock.
|
||||
fsn.writeUnlock();
|
||||
fsn.writeUnlock("getEncryptionKeyInfo");
|
||||
try {
|
||||
EncryptionFaultInjector.getInstance().startFileBeforeGenerateKey();
|
||||
return new EncryptionKeyInfo(protocolVersion, suite, ezKeyName,
|
||||
@ -733,7 +733,7 @@ static String getKeyNameForZone(final FSDirectory dir,
|
||||
dir.ezManager.checkEncryptionZoneRoot(iip.getLastINode(), zone);
|
||||
return dir.ezManager.getKeyName(iip);
|
||||
} finally {
|
||||
dir.getFSNamesystem().readUnlock();
|
||||
dir.getFSNamesystem().readUnlock("getKeyNameForZone");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1798,6 +1798,7 @@ public void readUnlock() {
|
||||
this.fsLock.readUnlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readUnlock(String opName) {
|
||||
this.fsLock.readUnlock(opName);
|
||||
}
|
||||
@ -1822,6 +1823,7 @@ public void writeUnlock() {
|
||||
this.fsLock.writeUnlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeUnlock(String opName) {
|
||||
this.fsLock.writeUnlock(opName);
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ public void run() {
|
||||
loader.load(fsImageFile, false);
|
||||
} finally {
|
||||
namesystem.getFSDirectory().writeUnlock();
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("loadImage");
|
||||
}
|
||||
}
|
||||
t.cancel();
|
||||
|
@ -2090,7 +2090,7 @@ public void writeLock() {
|
||||
@Override
|
||||
public void writeUnlock() {
|
||||
namesystem.unlockRetryCache();
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("HAState");
|
||||
}
|
||||
|
||||
/** Check if an operation of given category is allowed */
|
||||
@ -2254,7 +2254,7 @@ private String reconfReplicationParameters(final String newVal,
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property), e);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("reconfReplicationParameters");
|
||||
}
|
||||
}
|
||||
|
||||
@ -2291,7 +2291,7 @@ private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property), nfe);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("reconfHeartbeatInterval");
|
||||
LOG.info("RECONFIGURE* changed heartbeatInterval to "
|
||||
+ datanodeManager.getHeartbeatInterval());
|
||||
}
|
||||
@ -2315,7 +2315,7 @@ private String reconfHeartbeatRecheckInterval(
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property), nfe);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("reconfHeartbeatRecheckInterval");
|
||||
LOG.info("RECONFIGURE* changed heartbeatRecheckInterval to "
|
||||
+ datanodeManager.getHeartbeatRecheckInterval());
|
||||
}
|
||||
@ -2434,7 +2434,7 @@ String reconfigureSlowNodesParameters(final DatanodeManager datanodeManager,
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(
|
||||
property), e);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("reconfigureSlowNodesParameters");
|
||||
}
|
||||
}
|
||||
|
||||
@ -2454,7 +2454,7 @@ private String reconfigureBlockInvalidateLimit(final DatanodeManager datanodeMan
|
||||
} catch (NumberFormatException e) {
|
||||
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("reconfigureBlockInvalidateLimit");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -350,7 +350,7 @@ public void run() {
|
||||
getReencryptionStatus().markZoneStarted(zoneId);
|
||||
resetSubmissionTracker(zoneId);
|
||||
} finally {
|
||||
dir.getFSNamesystem().readUnlock();
|
||||
dir.getFSNamesystem().readUnlock("reEncryptThread");
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1098,7 +1098,7 @@ void doMerge(
|
||||
try {
|
||||
dstImage.reloadFromImageFile(file, dstNamesystem);
|
||||
} finally {
|
||||
dstNamesystem.writeUnlock();
|
||||
dstNamesystem.writeUnlock("reloadFromImageFile");
|
||||
}
|
||||
dstNamesystem.imageLoadComplete();
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
||||
lastLoadedTxnId = image.getLastAppliedTxId();
|
||||
return editsLoaded;
|
||||
} finally {
|
||||
namesystem.writeUnlock();
|
||||
namesystem.writeUnlock("doTailEdits");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +79,7 @@ private void gcDeletedSnapshot(String name) {
|
||||
LOG.error("Failed to chooseDeletedSnapshot", e);
|
||||
throw e;
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
namesystem.readUnlock("gcDeletedSnapshot");
|
||||
}
|
||||
if (deleted == null) {
|
||||
LOG.trace("{}: no snapshots are marked as deleted.", name);
|
||||
|
@ -28,6 +28,12 @@ public interface RwLock {
|
||||
/** Release read lock. */
|
||||
public void readUnlock();
|
||||
|
||||
/**
|
||||
* Release read lock with operation name.
|
||||
* @param opName Option name.
|
||||
*/
|
||||
public void readUnlock(String opName);
|
||||
|
||||
/** Check if the current thread holds read lock. */
|
||||
public boolean hasReadLock();
|
||||
|
||||
@ -40,6 +46,12 @@ public interface RwLock {
|
||||
/** Release write lock. */
|
||||
public void writeUnlock();
|
||||
|
||||
/**
|
||||
* Release write lock with operation name.
|
||||
* @param opName Option name.
|
||||
*/
|
||||
public void writeUnlock(String opName);
|
||||
|
||||
/** Check if the current thread holds write lock. */
|
||||
public boolean hasWriteLock();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user