HDFS-16898. Remove write lock for processCommandFromActor of DataNode to reduce impact on heartbeat (#5330). Contributed by ZhangHB.

Reviewed-by: zhangshuyan <zqingchai@gmail.com>
Reviewed-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
He Xiaoqiao 2023-02-08 11:19:07 +08:00
parent 7e919212c4
commit 3ba058a894
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
2 changed files with 19 additions and 14 deletions

View File

@ -679,15 +679,20 @@ boolean processCommandFromActor(DatanodeCommand cmd,
actor.reRegister(); actor.reRegister();
return false; return false;
} }
writeLock(); boolean isActiveActor;
InetSocketAddress nnSocketAddress;
readLock();
try { try {
if (actor == bpServiceToActive) { isActiveActor = (actor == bpServiceToActive);
return processCommandFromActive(cmd, actor); nnSocketAddress = actor.getNNSocketAddress();
} else {
return processCommandFromStandby(cmd, actor);
}
} finally { } finally {
writeUnlock(); readUnlock();
}
if (isActiveActor) {
return processCommandFromActive(cmd, nnSocketAddress);
} else {
return processCommandFromStandby(cmd, nnSocketAddress);
} }
} }
@ -715,7 +720,7 @@ private String blockIdArrayToString(long ids[]) {
* @throws IOException * @throws IOException
*/ */
private boolean processCommandFromActive(DatanodeCommand cmd, private boolean processCommandFromActive(DatanodeCommand cmd,
BPServiceActor actor) throws IOException { InetSocketAddress nnSocketAddress) throws IOException {
final BlockCommand bcmd = final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null; cmd instanceof BlockCommand? (BlockCommand)cmd: null;
final BlockIdCommand blockIdCmd = final BlockIdCommand blockIdCmd =
@ -768,7 +773,7 @@ assert getBlockPoolId().equals(bp) :
dn.finalizeUpgradeForPool(bp); dn.finalizeUpgradeForPool(bp);
break; break;
case DatanodeProtocol.DNA_RECOVERBLOCK: case DatanodeProtocol.DNA_RECOVERBLOCK:
String who = "NameNode at " + actor.getNNSocketAddress(); String who = "NameNode at " + nnSocketAddress;
dn.getBlockRecoveryWorker().recoverBlocks(who, dn.getBlockRecoveryWorker().recoverBlocks(who,
((BlockRecoveryCommand)cmd).getRecoveringBlocks()); ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break; break;
@ -810,11 +815,11 @@ assert getBlockPoolId().equals(bp) :
* DNA_REGISTER which should be handled earlier itself. * DNA_REGISTER which should be handled earlier itself.
*/ */
private boolean processCommandFromStandby(DatanodeCommand cmd, private boolean processCommandFromStandby(DatanodeCommand cmd,
BPServiceActor actor) throws IOException { InetSocketAddress nnSocketAddress) throws IOException {
switch(cmd.getAction()) { switch(cmd.getAction()) {
case DatanodeProtocol.DNA_ACCESSKEYUPDATE: case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE", LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE",
actor.getNNSocketAddress()); nnSocketAddress);
if (dn.isBlockTokenEnabled) { if (dn.isBlockTokenEnabled) {
dn.blockPoolTokenSecretManager.addKeys( dn.blockPoolTokenSecretManager.addKeys(
getBlockPoolId(), getBlockPoolId(),
@ -831,11 +836,11 @@ private boolean processCommandFromStandby(DatanodeCommand cmd,
case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_UNCACHE:
case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION:
LOG.warn("Got a command from standby NN {} - ignoring command: {}", LOG.warn("Got a command from standby NN {} - ignoring command: {}",
actor.getNNSocketAddress(), cmd.getAction()); nnSocketAddress, cmd.getAction());
break; break;
default: default:
LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}", LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}",
cmd.getAction(), actor.getNNSocketAddress()); cmd.getAction(), nnSocketAddress);
} }
return true; return true;
} }

View File

@ -1499,7 +1499,7 @@ private boolean processCommand(DatanodeCommand[] cmds) {
dn.getMetrics().addNumProcessedCommands(processCommandsMs); dn.getMetrics().addNumProcessedCommands(processCommandsMs);
} }
if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) { if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) {
LOG.info("Took {} ms to process {} commands from NN", LOG.warn("Took {} ms to process {} commands from NN",
processCommandsMs, cmds.length); processCommandsMs, cmds.length);
} }
} }