Revert "HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)" (#6457) Contributed by Shilun Fan.

This reverts commit c1bf3cb0.

Reviewed-by: Takanobu Asanuma <tasanuma@apache.org>
Reviewed-by: He Xiaoqiao <hexiaoqiao@apache.org>
Reviewed-by: Ayush Saxena <ayushsaxena@apache.org>
Reviewed-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
slfan1989 2024-01-20 07:51:55 +08:00 committed by GitHub
parent d0df0689b4
commit 15e1789baf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 17 additions and 86 deletions

View File

@ -36,8 +36,6 @@
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -73,7 +71,6 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
@ -103,8 +100,6 @@ class BPServiceActor implements Runnable {
volatile long lastCacheReport = 0; volatile long lastCacheReport = 0;
private final Scheduler scheduler; private final Scheduler scheduler;
private final Object sendIBRLock;
private final ExecutorService ibrExecutorService;
Thread bpThread; Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode; DatanodeProtocolClientSideTranslatorPB bpNamenode;
@ -161,10 +156,6 @@ enum RunningState {
} }
commandProcessingThread = new CommandProcessingThread(this); commandProcessingThread = new CommandProcessingThread(this);
commandProcessingThread.start(); commandProcessingThread.start();
sendIBRLock = new Object();
ibrExecutorService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ibr-executor-%d").build());
} }
public DatanodeRegistration getBpRegistration() { public DatanodeRegistration getBpRegistration() {
@ -397,10 +388,8 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
// we have a chance that we will miss the delHint information // we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports // or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one. // a FINALIZED one.
synchronized (sendIBRLock) { ibrManager.sendIBRs(bpNamenode, bpRegistration,
ibrManager.sendIBRs(bpNamenode, bpRegistration, bpos.getBlockPoolId(), getRpcMetricSuffix());
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
long brCreateStartTime = monotonicNow(); long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists = Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@ -633,9 +622,6 @@ void stop() {
if (commandProcessingThread != null) { if (commandProcessingThread != null) {
commandProcessingThread.interrupt(); commandProcessingThread.interrupt();
} }
if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
ibrExecutorService.shutdownNow();
}
} }
//This must be called only by blockPoolManager //This must be called only by blockPoolManager
@ -650,18 +636,13 @@ void join() {
} catch (InterruptedException ie) { } } catch (InterruptedException ie) { }
} }
// Cleanup method to be called by current thread before exiting. //Cleanup method to be called by current thread before exiting.
// Any Thread / ExecutorService started by BPServiceActor can be shutdown
// here.
private synchronized void cleanUp() { private synchronized void cleanUp() {
shouldServiceRun = false; shouldServiceRun = false;
IOUtils.cleanupWithLogger(null, bpNamenode); IOUtils.cleanupWithLogger(null, bpNamenode);
IOUtils.cleanupWithLogger(null, lifelineSender); IOUtils.cleanupWithLogger(null, lifelineSender);
bpos.shutdownActor(this); bpos.shutdownActor(this);
if (!ibrExecutorService.isShutdown()) {
ibrExecutorService.shutdownNow();
}
} }
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException { private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
@ -757,6 +738,11 @@ private void offerService() throws Exception {
isSlownode = resp.getIsSlownode(); isSlownode = resp.getIsSlownode();
} }
} }
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately()|| sendHeartbeat)) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
List<DatanodeCommand> cmds = null; List<DatanodeCommand> cmds = null;
boolean forceFullBr = boolean forceFullBr =
@ -923,10 +909,6 @@ public void run() {
initialRegistrationComplete.countDown(); initialRegistrationComplete.countDown();
} }
// IBR tasks to be handled separately from offerService() in order to
// improve performance of offerService(), which can now focus only on
// FBR and heartbeat.
ibrExecutorService.submit(new IBRTaskHandler());
while (shouldRun()) { while (shouldRun()) {
try { try {
offerService(); offerService();
@ -1159,34 +1141,6 @@ private void sendLifeline() throws IOException {
} }
} }
class IBRTaskHandler implements Runnable {
@Override
public void run() {
LOG.info("Starting IBR Task Handler.");
while (shouldRun()) {
try {
final long startTime = scheduler.monotonicNow();
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
if (!dn.areIBRDisabledForTests() &&
(ibrManager.sendImmediately() || sendHeartbeat)) {
synchronized (sendIBRLock) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), getRpcMetricSuffix());
}
}
// There is no work to do; sleep until heartbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
} catch (Throwable t) {
LOG.error("Exception in IBRTaskHandler.", t);
sleepAndLogInterrupts(5000, "offering IBR service");
}
}
}
}
/** /**
* Utility class that wraps the timestamp computations for scheduling * Utility class that wraps the timestamp computations for scheduling
* heartbeats and block reports. * heartbeats and block reports.

View File

@ -172,19 +172,8 @@ public void testDatanodeReportMissingBlock() throws Exception {
// all bad datanodes // all bad datanodes
} }
cluster.triggerHeartbeats(); // IBR delete ack cluster.triggerHeartbeats(); // IBR delete ack
int retries = 0; lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
while (true) { assertEquals(0, lb.getLocations().length);
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
if (0 != lb.getLocations().length) {
retries++;
if (retries > 7) {
Assert.fail("getLocatedBlocks failed after 7 retries");
}
Thread.sleep(2000);
} else {
break;
}
}
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
@ -234,4 +223,4 @@ static DataNode findDatanode(String id, List<DataNode> datanodes) {
throw new IllegalStateException("Datnode " + id + " not in datanode list: " throw new IllegalStateException("Datnode " + id + " not in datanode list: "
+ datanodes); + datanodes);
} }
} }

View File

@ -25,7 +25,6 @@
import java.io.IOException; import java.io.IOException;
import org.mockito.exceptions.base.MockitoAssertionError;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -157,7 +156,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
// Sleep for a very short time since IBR is generated // Sleep for a very short time since IBR is generated
// asynchronously. // asynchronously.
Thread.sleep(1000); Thread.sleep(2000);
// Ensure that no block report was generated immediately. // Ensure that no block report was generated immediately.
// Deleted blocks are reported when the IBR timer elapses. // Deleted blocks are reported when the IBR timer elapses.
@ -168,24 +167,13 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
// Trigger a heartbeat, this also triggers an IBR. // Trigger a heartbeat, this also triggers an IBR.
DataNodeTestUtils.triggerHeartbeat(singletonDn); DataNodeTestUtils.triggerHeartbeat(singletonDn);
Thread.sleep(2000);
// Ensure that the deleted block is reported. // Ensure that the deleted block is reported.
int retries = 0; Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
while (true) { any(DatanodeRegistration.class),
try { anyString(),
Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted( any(StorageReceivedDeletedBlocks[].class));
any(DatanodeRegistration.class),
anyString(),
any(StorageReceivedDeletedBlocks[].class));
break;
} catch (MockitoAssertionError e) {
if (retries > 7) {
throw e;
}
retries++;
Thread.sleep(2000);
}
}
} finally { } finally {
cluster.shutdown(); cluster.shutdown();