HDFS-16016. BPServiceActor to provide new thread to handle IBR (#2998)
Contributed by Viraj Jasani
This commit is contained in:
parent
e31d06032b
commit
c1bf3cb0da
@ -34,6 +34,8 @@
|
|||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -66,6 +68,7 @@
|
|||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
@ -94,6 +97,8 @@ class BPServiceActor implements Runnable {
|
|||||||
|
|
||||||
volatile long lastCacheReport = 0;
|
volatile long lastCacheReport = 0;
|
||||||
private final Scheduler scheduler;
|
private final Scheduler scheduler;
|
||||||
|
private final Object sendIBRLock;
|
||||||
|
private final ExecutorService ibrExecutorService;
|
||||||
|
|
||||||
Thread bpThread;
|
Thread bpThread;
|
||||||
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
DatanodeProtocolClientSideTranslatorPB bpNamenode;
|
||||||
@ -149,6 +154,10 @@ enum RunningState {
|
|||||||
}
|
}
|
||||||
commandProcessingThread = new CommandProcessingThread(this);
|
commandProcessingThread = new CommandProcessingThread(this);
|
||||||
commandProcessingThread.start();
|
commandProcessingThread.start();
|
||||||
|
sendIBRLock = new Object();
|
||||||
|
ibrExecutorService = Executors.newSingleThreadExecutor(
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
|
.setNameFormat("ibr-executor-%d").build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public DatanodeRegistration getBpRegistration() {
|
public DatanodeRegistration getBpRegistration() {
|
||||||
@ -368,8 +377,10 @@ List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
|
|||||||
// we have a chance that we will miss the delHint information
|
// we have a chance that we will miss the delHint information
|
||||||
// or we will report an RBW replica after the BlockReport already reports
|
// or we will report an RBW replica after the BlockReport already reports
|
||||||
// a FINALIZED one.
|
// a FINALIZED one.
|
||||||
|
synchronized (sendIBRLock) {
|
||||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||||
bpos.getBlockPoolId(), getRpcMetricSuffix());
|
bpos.getBlockPoolId(), getRpcMetricSuffix());
|
||||||
|
}
|
||||||
|
|
||||||
long brCreateStartTime = monotonicNow();
|
long brCreateStartTime = monotonicNow();
|
||||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||||
@ -600,6 +611,9 @@ void stop() {
|
|||||||
if (commandProcessingThread != null) {
|
if (commandProcessingThread != null) {
|
||||||
commandProcessingThread.interrupt();
|
commandProcessingThread.interrupt();
|
||||||
}
|
}
|
||||||
|
if (ibrExecutorService != null && !ibrExecutorService.isShutdown()) {
|
||||||
|
ibrExecutorService.shutdownNow();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//This must be called only by blockPoolManager
|
//This must be called only by blockPoolManager
|
||||||
@ -614,13 +628,18 @@ void join() {
|
|||||||
} catch (InterruptedException ie) { }
|
} catch (InterruptedException ie) { }
|
||||||
}
|
}
|
||||||
|
|
||||||
//Cleanup method to be called by current thread before exiting.
|
// Cleanup method to be called by current thread before exiting.
|
||||||
|
// Any Thread / ExecutorService started by BPServiceActor can be shutdown
|
||||||
|
// here.
|
||||||
private synchronized void cleanUp() {
|
private synchronized void cleanUp() {
|
||||||
|
|
||||||
shouldServiceRun = false;
|
shouldServiceRun = false;
|
||||||
IOUtils.cleanupWithLogger(null, bpNamenode);
|
IOUtils.cleanupWithLogger(null, bpNamenode);
|
||||||
IOUtils.cleanupWithLogger(null, lifelineSender);
|
IOUtils.cleanupWithLogger(null, lifelineSender);
|
||||||
bpos.shutdownActor(this);
|
bpos.shutdownActor(this);
|
||||||
|
if (!ibrExecutorService.isShutdown()) {
|
||||||
|
ibrExecutorService.shutdownNow();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
|
private void handleRollingUpgradeStatus(HeartbeatResponse resp) throws IOException {
|
||||||
@ -706,11 +725,6 @@ private void offerService() throws Exception {
|
|||||||
commandProcessingThread.enqueue(resp.getCommands());
|
commandProcessingThread.enqueue(resp.getCommands());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!dn.areIBRDisabledForTests() &&
|
|
||||||
(ibrManager.sendImmediately()|| sendHeartbeat)) {
|
|
||||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
|
||||||
bpos.getBlockPoolId(), getRpcMetricSuffix());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<DatanodeCommand> cmds = null;
|
List<DatanodeCommand> cmds = null;
|
||||||
boolean forceFullBr =
|
boolean forceFullBr =
|
||||||
@ -874,6 +888,10 @@ public void run() {
|
|||||||
initialRegistrationComplete.countDown();
|
initialRegistrationComplete.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IBR tasks to be handled separately from offerService() in order to
|
||||||
|
// improve performance of offerService(), which can now focus only on
|
||||||
|
// FBR and heartbeat.
|
||||||
|
ibrExecutorService.submit(new IBRTaskHandler());
|
||||||
while (shouldRun()) {
|
while (shouldRun()) {
|
||||||
try {
|
try {
|
||||||
offerService();
|
offerService();
|
||||||
@ -1104,6 +1122,34 @@ private void sendLifeline() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class IBRTaskHandler implements Runnable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Starting IBR Task Handler.");
|
||||||
|
while (shouldRun()) {
|
||||||
|
try {
|
||||||
|
final long startTime = scheduler.monotonicNow();
|
||||||
|
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
|
||||||
|
if (!dn.areIBRDisabledForTests() &&
|
||||||
|
(ibrManager.sendImmediately() || sendHeartbeat)) {
|
||||||
|
synchronized (sendIBRLock) {
|
||||||
|
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||||
|
bpos.getBlockPoolId(), getRpcMetricSuffix());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// There is no work to do; sleep until heartbeat timer elapses,
|
||||||
|
// or work arrives, and then iterate again.
|
||||||
|
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Exception in IBRTaskHandler.", t);
|
||||||
|
sleepAndLogInterrupts(5000, "offering IBR service");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class that wraps the timestamp computations for scheduling
|
* Utility class that wraps the timestamp computations for scheduling
|
||||||
* heartbeats and block reports.
|
* heartbeats and block reports.
|
||||||
|
@ -172,8 +172,19 @@ public void testDatanodeReportMissingBlock() throws Exception {
|
|||||||
// all bad datanodes
|
// all bad datanodes
|
||||||
}
|
}
|
||||||
cluster.triggerHeartbeats(); // IBR delete ack
|
cluster.triggerHeartbeats(); // IBR delete ack
|
||||||
|
int retries = 0;
|
||||||
|
while (true) {
|
||||||
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
|
lb = fs.getClient().getLocatedBlocks(p.toString(), 0).get(0);
|
||||||
assertEquals(0, lb.getLocations().length);
|
if (0 != lb.getLocations().length) {
|
||||||
|
retries++;
|
||||||
|
if (retries > 7) {
|
||||||
|
Assert.fail("getLocatedBlocks failed after 7 retries");
|
||||||
|
}
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.mockito.exceptions.base.MockitoAssertionError;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -156,7 +157,7 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
|
|||||||
|
|
||||||
// Sleep for a very short time since IBR is generated
|
// Sleep for a very short time since IBR is generated
|
||||||
// asynchronously.
|
// asynchronously.
|
||||||
Thread.sleep(2000);
|
Thread.sleep(1000);
|
||||||
|
|
||||||
// Ensure that no block report was generated immediately.
|
// Ensure that no block report was generated immediately.
|
||||||
// Deleted blocks are reported when the IBR timer elapses.
|
// Deleted blocks are reported when the IBR timer elapses.
|
||||||
@ -167,13 +168,24 @@ public void testReportBlockDeleted() throws InterruptedException, IOException {
|
|||||||
|
|
||||||
// Trigger a heartbeat, this also triggers an IBR.
|
// Trigger a heartbeat, this also triggers an IBR.
|
||||||
DataNodeTestUtils.triggerHeartbeat(singletonDn);
|
DataNodeTestUtils.triggerHeartbeat(singletonDn);
|
||||||
Thread.sleep(2000);
|
|
||||||
|
|
||||||
// Ensure that the deleted block is reported.
|
// Ensure that the deleted block is reported.
|
||||||
Mockito.verify(nnSpy, times(1)).blockReceivedAndDeleted(
|
int retries = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
Mockito.verify(nnSpy, atLeastOnce()).blockReceivedAndDeleted(
|
||||||
any(DatanodeRegistration.class),
|
any(DatanodeRegistration.class),
|
||||||
anyString(),
|
anyString(),
|
||||||
any(StorageReceivedDeletedBlocks[].class));
|
any(StorageReceivedDeletedBlocks[].class));
|
||||||
|
break;
|
||||||
|
} catch (MockitoAssertionError e) {
|
||||||
|
if (retries > 7) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
retries++;
|
||||||
|
Thread.sleep(2000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
Loading…
Reference in New Issue
Block a user