From 0fd2b10bdc0a63696348b03590651de26d14b3a5 Mon Sep 17 00:00:00 2001 From: hfutatzhanghb Date: Mon, 12 Jun 2023 13:42:39 +0800 Subject: [PATCH] HDFS-16898. Remove write lock for processCommandFromActor of DataNode to reduce impact on heartbeat. (#5408). Contributed by ZhangHB. Signed-off-by: He Xiaoqiao --- .../hdfs/server/datanode/BPOfferService.java | 34 ++++++++++++------- .../hdfs/server/datanode/BPServiceActor.java | 2 +- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 40046b0d8c..b8a52aa79f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -678,15 +678,20 @@ boolean processCommandFromActor(DatanodeCommand cmd, actor.reRegister(); return false; } - writeLock(); + boolean isActiveActor; + InetSocketAddress nnSocketAddress; + readLock(); try { - if (actor == bpServiceToActive) { - return processCommandFromActive(cmd, actor); - } else { - return processCommandFromStandby(cmd, actor); - } + isActiveActor = (actor == bpServiceToActive); + nnSocketAddress = actor.getNNSocketAddress(); } finally { - writeUnlock(); + readUnlock(); + } + + if (isActiveActor) { + return processCommandFromActive(cmd, nnSocketAddress); + } else { + return processCommandFromStandby(cmd, nnSocketAddress); } } @@ -714,7 +719,7 @@ private String blockIdArrayToString(long ids[]) { * @throws IOException */ private boolean processCommandFromActive(DatanodeCommand cmd, - BPServiceActor actor) throws IOException { + InetSocketAddress nnSocketAddress) throws IOException { final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null; final BlockIdCommand blockIdCmd = @@ -768,7 +773,7 @@ assert getBlockPoolId().equals(bp) : dn.finalizeUpgradeForPool(bp); break; case DatanodeProtocol.DNA_RECOVERBLOCK: - String who = "NameNode at " + actor.getNNSocketAddress(); + String who = "NameNode at " + nnSocketAddress; dn.getBlockRecoveryWorker().recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; @@ -810,10 +815,11 @@ assert getBlockPoolId().equals(bp) : * DNA_REGISTER which should be handled earlier itself. */ private boolean processCommandFromStandby(DatanodeCommand cmd, - BPServiceActor actor) throws IOException { + InetSocketAddress nnSocketAddress) throws IOException { switch(cmd.getAction()) { case DatanodeProtocol.DNA_ACCESSKEYUPDATE: - LOG.info("DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE"); + LOG.info("DatanodeCommand action from standby NN {}: DNA_ACCESSKEYUPDATE", + nnSocketAddress); if (dn.isBlockTokenEnabled) { dn.blockPoolTokenSecretManager.addKeys( getBlockPoolId(), @@ -829,10 +835,12 @@ private boolean processCommandFromStandby(DatanodeCommand cmd, case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: - LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); + LOG.warn("Got a command from standby NN {} - ignoring command: {}", + nnSocketAddress, cmd.getAction()); break; default: - LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction()); + LOG.warn("Unknown DatanodeCommand action: {} from standby NN {}", + cmd.getAction(), nnSocketAddress); } return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 83f68b4370..71834f8965 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -1491,7 +1491,7 @@ private boolean processCommand(DatanodeCommand[] cmds) { dn.getMetrics().addNumProcessedCommands(processCommandsMs); } if (processCommandsMs > dnConf.getProcessCommandsThresholdMs()) { - LOG.info("Took {} ms to process {} commands from NN", + LOG.warn("Took {} ms to process {} commands from NN", processCommandsMs, cmds.length); } }