From b86895485d95440de099831e0db38db037f16bdd Mon Sep 17 00:00:00 2001 From: Inigo Goiri Date: Thu, 19 Dec 2019 09:34:43 -0800 Subject: [PATCH] HDFS-14997. BPServiceActor processes commands from NameNode asynchronously. Contributed by Xiaoqiao He. --- .../hdfs/server/datanode/BPServiceActor.java | 143 +++++++++++++----- .../datanode/metrics/DataNodeMetrics.java | 12 ++ .../server/datanode/TestBPOfferService.java | 36 ++++- 3 files changed, 154 insertions(+), 37 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 495035e3f9..13959a3a63 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -32,7 +32,9 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -117,6 +119,7 @@ enum RunningState { private DatanodeRegistration bpRegistration; final LinkedList bpThreadQueue = new LinkedList(); + private final CommandProcessingThread commandProcessingThread; BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr, BPOfferService bpos) { @@ -144,6 +147,8 @@ enum RunningState { if (nnId != null) { this.nnId = nnId; } + commandProcessingThread = new CommandProcessingThread(this); + commandProcessingThread.start(); } public DatanodeRegistration getBpRegistration() { @@ -696,8 +701,7 @@ private void offerService() throws Exception { } long startProcessCommands = monotonicNow(); - if (!processCommand(resp.getCommands())) - continue; + commandProcessingThread.enqueue(resp.getCommands()); long endProcessCommands = monotonicNow(); if (endProcessCommands - startProcessCommands > 2000) { LOG.info("Took " + (endProcessCommands - startProcessCommands) @@ -722,11 +726,11 @@ private void offerService() throws Exception { cmds = blockReport(fullBlockReportLeaseId); fullBlockReportLeaseId = 0; } - processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); + commandProcessingThread.enqueue(cmds); if (!dn.areCacheReportsDisabledForTests()) { DatanodeCommand cmd = cacheReport(); - processCommand(new DatanodeCommand[]{ cmd }); + commandProcessingThread.enqueue(cmd); } if (sendHeartbeat) { @@ -900,37 +904,6 @@ private boolean shouldRun() { return shouldServiceRun && dn.shouldRun(); } - /** - * Process an array of datanode commands - * - * @param cmds an array of datanode commands - * @return true if further processing may be required or false otherwise. - */ - boolean processCommand(DatanodeCommand[] cmds) { - if (cmds != null) { - for (DatanodeCommand cmd : cmds) { - try { - if (bpos.processCommandFromActor(cmd, this) == false) { - return false; - } - } catch (RemoteException re) { - String reClass = re.getClassName(); - if (UnregisteredNodeException.class.getName().equals(reClass) || - DisallowedDatanodeException.class.getName().equals(reClass) || - IncorrectVersionException.class.getName().equals(reClass)) { - LOG.warn(this + " is shutting down", re); - shouldServiceRun = false; - return false; - } - } catch (IOException ioe) { - LOG.warn("Error processing datanode Command", ioe); - } - } - } - return true; - } - - /** * Report a bad block from another DN in this cluster. */ @@ -1304,4 +1277,102 @@ public long monotonicNow() { return Time.monotonicNow(); } } -} + + /** + * CommandProcessingThread that process commands asynchronously. + */ + class CommandProcessingThread extends Thread { + private final BPServiceActor actor; + private final BlockingQueue queue; + + CommandProcessingThread(BPServiceActor actor) { + super("Command processor"); + this.actor = actor; + this.queue = new LinkedBlockingQueue<>(); + setDaemon(true); + } + + @Override + public void run() { + try { + processQueue(); + } catch (Throwable t) { + LOG.error("{} encountered fatal exception and exit.", getName(), t); + } + } + + /** + * Process commands in queue one by one, and wait until queue not empty. + */ + private void processQueue() { + while (shouldRun()) { + try { + Runnable action = queue.take(); + action.run(); + dn.getMetrics().incrActorCmdQueueLength(-1); + dn.getMetrics().incrNumProcessedCommands(); + } catch (InterruptedException e) { + LOG.error("{} encountered interrupt and exit.", getName()); + // ignore unless thread was specifically interrupted. + if (Thread.interrupted()) { + break; + } + } + } + dn.getMetrics().incrActorCmdQueueLength(-1 * queue.size()); + queue.clear(); + } + + /** + * Process an array of datanode commands. + * + * @param cmds an array of datanode commands + * @return true if further processing may be required or false otherwise. + */ + private boolean processCommand(DatanodeCommand[] cmds) { + if (cmds != null) { + for (DatanodeCommand cmd : cmds) { + try { + if (!bpos.processCommandFromActor(cmd, actor)) { + return false; + } + } catch (RemoteException re) { + String reClass = re.getClassName(); + if (UnregisteredNodeException.class.getName().equals(reClass) || + DisallowedDatanodeException.class.getName().equals(reClass) || + IncorrectVersionException.class.getName().equals(reClass)) { + LOG.warn("{} is shutting down", this, re); + shouldServiceRun = false; + return false; + } + } catch (IOException ioe) { + LOG.warn("Error processing datanode Command", ioe); + } + } + } + return true; + } + + void enqueue(DatanodeCommand cmd) throws InterruptedException { + if (cmd == null) { + return; + } + queue.put(() -> processCommand(new DatanodeCommand[]{cmd})); + dn.getMetrics().incrActorCmdQueueLength(1); + } + + void enqueue(List cmds) throws InterruptedException { + if (cmds == null) { + return; + } + queue.put(() -> processCommand( + cmds.toArray(new DatanodeCommand[cmds.size()]))); + dn.getMetrics().incrActorCmdQueueLength(1); + } + + void enqueue(DatanodeCommand[] cmds) throws InterruptedException { + queue.put(() -> processCommand(cmds)); + dn.getMetrics().incrActorCmdQueueLength(1); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 89cd1cac3c..ffd0b7b718 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -160,6 +160,10 @@ public class DataNodeMetrics { private MutableCounterLong ecReconstructionDecodingTimeMillis; @Metric("Milliseconds spent on write by erasure coding worker") private MutableCounterLong ecReconstructionWriteTimeMillis; + @Metric("Sum of all BPServiceActors command queue length") + private MutableCounterLong sumOfActorCommandQueueLength; + @Metric("Num of processed commands of all BPServiceActors") + private MutableCounterLong numProcessedCommands; final MetricsRegistry registry = new MetricsRegistry("datanode"); @Metric("Milliseconds spent on calling NN rpc") @@ -552,4 +556,12 @@ public DataNodeUsageReport getDNUsageReport(long timeSinceLastReport) { .value(), totalWriteTime.value(), totalReadTime.value(), blocksWritten.value(), blocksRead.value(), timeSinceLastReport); } + + public void incrActorCmdQueueLength(int delta) { + sumOfActorCommandQueueLength.incr(delta); + } + + public void incrNumProcessedCommands() { + numProcessedCommands.incr(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 2353992340..2e8cf24246 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -18,7 +18,15 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; + +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNotNull; @@ -36,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -1045,4 +1054,29 @@ public Object answer(InvocationOnMock invocation) bpos.stop(); } } -} + + @Test(timeout = 15000) + public void testCommandProcessingThread() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + List datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), 1); + DataNode datanode = datanodes.get(0); + + // Try to write file and trigger NN send back command to DataNode. + FileSystem fs = cluster.getFileSystem(); + Path file = new Path("/test"); + DFSTestUtil.createFile(fs, file, 10240L, (short)1, 0L); + + MetricsRecordBuilder mrb = getMetrics(datanode.getMetrics().name()); + assertTrue("Process command nums is not expected.", + getLongCounter("NumProcessedCommands", mrb) > 0); + assertEquals(0, getLongCounter("SumOfActorCommandQueueLength", mrb)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +} \ No newline at end of file