From 3ba058a894b14c13651a52219b72eff48d07f4f4 Mon Sep 17 00:00:00 2001 From: He Xiaoqiao Date: Wed, 8 Feb 2023 11:19:07 +0800 Subject: [PATCH] HDFS-16898. Remove write lock for processCommandFromActor of DataNode to reduce impact on heartbeat (#5330). Contributed by ZhangHB. Reviewed-by: zhangshuyan Reviewed-by: Viraj Jasani Signed-off-by: He Xiaoqiao --- .../hdfs/server/datanode/BPOfferService.java | 31 +++++++++++-------- .../hdfs/server/datanode/BPServiceActor.java | 2 +- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index d660970c72..fdd66cb05d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -679,15 +679,20 @@ boolean processCommandFromActor(DatanodeCommand cmd, actor.reRegister(); return false; } - writeLock(); + boolean isActiveActor; + InetSocketAddress nnSocketAddress; + readLock(); try { - if (actor == bpServiceToActive) { - return processCommandFromActive(cmd, actor); - } else { - return processCommandFromStandby(cmd, actor); - } + isActiveActor = (actor == bpServiceToActive); + nnSocketAddress = actor.getNNSocketAddress(); } finally { - writeUnlock(); + readUnlock(); + } + + if (isActiveActor) { + return processCommandFromActive(cmd, nnSocketAddress); + } else { + return processCommandFromStandby(cmd, nnSocketAddress); } } @@ -715,7 +720,7 @@ private String blockIdArrayToString(long ids[]) { * @throws IOException */ private boolean processCommandFromActive(DatanodeCommand cmd, - BPServiceActor actor) throws IOException { + InetSocketAddress nnSocketAddress) throws IOException { final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; final BlockIdCommand blockIdCmd = @@ -768,7 +773,7 @@ assert getBlockPoolId().equals(bp) : dn.finalizeUpgradeForPool(bp); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - String who = "NameNode at " + actor.getNNSocketAddress(); + String who = "NameNode at " + nnSocketAddress; dn.getBlockRecoveryWorker().recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; @@ -810,11 +815,11 @@ assert getBlockPoolId().equals(bp) : * DNA_REGISTER which should be handled earlier itself. */ private boolean processCommandFromStandby(DatanodeCommand cmd, - BPServiceActor actor) throws IOException { + InetSocketAddress nnSocketAddress) throws IOException { switch(cmd.getAction()) { case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE", - actor.getNNSocketAddress()); + nnSocketAddress); if (dn.isBlockTokenEnabled) { dn.blockPoolTokenSecretManager.addKeys( getBlockPoolId(), @@ -831,11 +836,11 @@ private boolean processCommandFromStandby(DatanodeCommand cmd, case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.warn("Got a command from standby NN {} - ignoring command: {}", - actor.getNNSocketAddress(), cmd.getAction()); + nnSocketAddress, cmd.getAction()); break; default: LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}", - cmd.getAction(), actor.getNNSocketAddress()); + cmd.getAction(), nnSocketAddress); } return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 35ab619314..e9f424604b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -1499,7 +1499,7 @@ private boolean processCommand(DatanodeCommand[] cmds) { dn.getMetrics().addNumProcessedCommands(processCommandsMs); } if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) { - LOG.info("Took {} ms to process {} commands from NN", + LOG.warn("Took {} ms to process {} commands from NN", processCommandsMs, cmds.length); } }