From 15e1789bafb5ca16e6d4e24e8319b7303bbf8194 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sat, 20 Jan 2024 07:51:55 +0800 Subject: [PATCH] Revert "HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)" (#6457) Contributed by Shilun Fan. This reverts commit c1bf3cb0. Reviewed-by: Takanobu Asanuma Reviewed-by: He Xiaoqiao Reviewed-by: Ayush Saxena Reviewed-by: Viraj Jasani Signed-off-by: Shilun Fan --- .../hdfs/server/datanode/BPServiceActor.java | 62 +++---------------- .../hadoop/hdfs/TestDatanodeReport.java | 17 +---- .../datanode/TestIncrementalBlockReports.java | 24 ++----- 3 files changed, 17 insertions(+), 86 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index b552fa277d..4bac0d8fb4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -36,8 +36,6 @@ import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,7 +71,6 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.VersionInfo; @@ -103,8 +100,6 @@ class BPServiceActor implements Runnable { volatile long lastCacheReport = 0; private final Scheduler scheduler; - private final Object sendIBRLock; - private final ExecutorService ibrExecutorService; Thread bpThread; DatanodeProtocolClientSideTranslatorPB bpNamenode; @@ -161,10 +156,6 @@ enum RunningState { } commandProcessingThread = new CommandProcessingThread(this); commandProcessingThread.start(); - sendIBRLock = new Object(); - ibrExecutorService = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("ibr-executor-%d").build()); } public DatanodeRegistration getBpRegistration() { @@ -397,10 +388,8 @@ List blockReport(long fullBrLeaseId) throws IOException { // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. - synchronized (sendIBRLock) { - ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), getRpcMetricSuffix()); - } + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), getRpcMetricSuffix()); long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = @@ -633,9 +622,6 @@ void stop() { if (commandProcessingThread != null) { commandProcessingThread.interrupt(); } - if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) { - ibrExecutorService.shutdownNow(); - } } //This must be called only by blockPoolManager @@ -650,18 +636,13 @@ void join() { } catch (InterruptedException ie) { } } - // Cleanup method to be called by current thread before exiting. - // Any Thread / ExecutorService started by BPServiceActor can be shutdown - // here. + //Cleanup method to be called by current thread before exiting. private synchronized void cleanUp() { shouldServiceRun = false; IOUtils.cleanupWithLogger(null, bpNamenode); IOUtils.cleanupWithLogger(null, lifelineSender); bpos.shutdownActor(this); - if (!ibrExecutorService.isShutdown()) { - ibrExecutorService.shutdownNow(); - } } private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { @@ -757,6 +738,11 @@ private void offerService() throws Exception { isSlownode = resp.getIsSlownode(); } } + if (!dn.areIBRDisabledForTests() && + (ibrManager.sendImmediately()|| sendHeartbeat)) { + ibrManager.sendIBRs(bpNamenode, bpRegistration, + bpos.getBlockPoolId(), getRpcMetricSuffix()); + } List cmds = null; boolean forceFullBr = @@ -923,10 +909,6 @@ public void run() { initialRegistrationComplete.countDown(); } - // IBR tasks to be handled separately from offerService() in order to - // improve performance of offerService(), which can now focus only on - // FBR and heartbeat. - ibrExecutorService.submit(new IBRTaskHandler()); while (shouldRun()) { try { offerService(); @@ -1159,34 +1141,6 @@ private void sendLifeline() throws IOException { } } - class IBRTaskHandler implements Runnable { - - @Override - public void run() { - LOG.info("Starting IBR Task Handler."); - while (shouldRun()) { - try { - final long startTime = scheduler.monotonicNow(); - final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); - if (!dn.areIBRDisabledForTests() && - (ibrManager.sendImmediately() || sendHeartbeat)) { - synchronized (sendIBRLock) { - ibrManager.sendIBRs(bpNamenode, bpRegistration, - bpos.getBlockPoolId(), getRpcMetricSuffix()); - } - } - // There is no work to do; sleep until heartbeat timer elapses, - // or work arrives, and then iterate again. - ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime()); - } catch (Throwable t) { - LOG.error("Exception in IBRTaskHandler.", t); - sleepAndLogInterrupts(5000, "offering IBR service"); - } - } - } - - } - /** * Utility class that wraps the timestamp computations for scheduling * heartbeats and block reports. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java index 239555a8b0..a844e1727b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeReport.java @@ -172,19 +172,8 @@ public void testDatanodeReportMissingBlock() throws Exception { // all bad datanodes } cluster.triggerHeartbeats(); // IBR delete ack - int retries = 0; - while (true) { - lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0); - if (0 != lb.getLocations().length) { - retries++; - if (retries > 7) { - Assert.fail("getLocatedBlocks failed after 7 retries"); - } - Thread.sleep(2000); - } else { - break; - } - } + lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0); + assertEquals(0, lb.getLocations().length); } finally { cluster.shutdown(); } @@ -234,4 +223,4 @@ static DataNode findDatanode(String id, List datanodes) { throw new IllegalStateException("Datnode " + id + " not in datanode list: " + datanodes); } -} +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java index e848cbfb37..4221ecaf2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBlockReports.java @@ -25,7 +25,6 @@ import java.io.IOException; -import org.mockito.exceptions.base.MockitoAssertionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -157,7 +156,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Sleep for a very short time since IBR is generated // asynchronously. - Thread.sleep(1000); + Thread.sleep(2000); // Ensure that no block report was generated immediately. // Deleted blocks are reported when the IBR timer elapses. @@ -168,24 +167,13 @@ public void testReportBlockDeleted() throws InterruptedException, IOException { // Trigger a heartbeat, this also triggers an IBR. DataNodeTestUtils.triggerHeartbeat(singletonDn); + Thread.sleep(2000); // Ensure that the deleted block is reported. - int retries = 0; - while (true) { - try { - Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted( - any(DatanodeRegistration.class), - anyString(), - any(StorageReceivedDeletedBlocks[].class)); - break; - } catch (MockitoAssertionError e) { - if (retries > 7) { - throw e; - } - retries++; - Thread.sleep(2000); - } - } + Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); } finally { cluster.shutdown();