HDFS-16139. Update BPServiceActor Scheduler's nextBlockReportTime atomically (#3228). Contributed by Viraj Jasani.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
(cherry picked from commit b038042ece)
This commit is contained in:
Viraj Jasani 2021-07-27 12:27:12 +05:30 committed by Takanobu Asanuma
parent 1c71d6e9fe
commit 831c11c47a
2 changed files with 40 additions and 25 deletions

View File

@ -37,6 +37,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -309,10 +310,10 @@ private void connectToNNAndHandshake() throws IOException {
void triggerBlockReportForTests() {
synchronized (ibrManager) {
scheduler.scheduleHeartbeat();
long oldBlockReportTime = scheduler.nextBlockReportTime;
long oldBlockReportTime = scheduler.getNextBlockReportTime();
scheduler.forceFullBlockReportNow();
ibrManager.notifyAll();
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
while (oldBlockReportTime == scheduler.getNextBlockReportTime()) {
try {
ibrManager.wait(100);
} catch (InterruptedException e) {
@ -1112,8 +1113,8 @@ static class Scheduler {
// nextBlockReportTime and nextHeartbeatTime may be assigned/read
// by testing threads (through BPServiceActor#triggerXXX), while also
// assigned/read by the actor thread.
@VisibleForTesting
volatile long nextBlockReportTime = monotonicNow();
private final AtomicLong nextBlockReportTime =
new AtomicLong(monotonicNow());
@VisibleForTesting
volatile long nextHeartbeatTime = monotonicNow();
@ -1206,7 +1207,7 @@ boolean isLifelineDue(long startTime) {
}
boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
return nextBlockReportTime.get() - curTime <= 0;
}
boolean isOutliersReportDue(long curTime) {
@ -1231,15 +1232,15 @@ void forceFullBlockReportNow() {
long scheduleBlockReport(long delay, boolean isRegistration) {
if (delay > 0) { // send BR after random delay
// Numerical overflow is possible here and is okay.
nextBlockReportTime =
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay));
nextBlockReportTime.getAndSet(
monotonicNow() + ThreadLocalRandom.current().nextInt((int) (delay)));
} else { // send at next heartbeat
nextBlockReportTime = monotonicNow();
nextBlockReportTime.getAndSet(monotonicNow());
}
resetBlockReportTime = isRegistration; // reset future BRs for
// randomness, post first block report to avoid regular BRs from all
// DN's coming at one time.
return nextBlockReportTime;
return nextBlockReportTime.get();
}
/**
@ -1252,8 +1253,8 @@ void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
ThreadLocalRandom.current().nextInt((int)(blockReportIntervalMs));
nextBlockReportTime.getAndSet(monotonicNow() +
ThreadLocalRandom.current().nextInt((int) (blockReportIntervalMs)));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
@ -1263,17 +1264,16 @@ void scheduleNextBlockReport() {
* 2) unexpected like 21:35:43, next report should be at 2:20:14
* on the next day.
*/
long factor =
(monotonicNow() - nextBlockReportTime + blockReportIntervalMs)
/ blockReportIntervalMs;
long factor = (monotonicNow() - nextBlockReportTime.get()
+ blockReportIntervalMs) / blockReportIntervalMs;
if (factor != 0) {
nextBlockReportTime += factor * blockReportIntervalMs;
nextBlockReportTime.getAndAdd(factor * blockReportIntervalMs);
} else {
// If the difference between the present time and the scheduled
// time is very less, the factor can be 0, so in that case, we can
// ignore that negligible time, spent while sending the BRss and
// schedule the next BR after the blockReportInterval.
nextBlockReportTime += blockReportIntervalMs;
nextBlockReportTime.getAndAdd(blockReportIntervalMs);
}
}
}
@ -1286,6 +1286,16 @@ long getLifelineWaitTime() {
return nextLifelineTime - monotonicNow();
}
@VisibleForTesting
long getNextBlockReportTime() {
return nextBlockReportTime.get();
}
@VisibleForTesting
void setNextBlockReportTime(long nextBlockReportTime) {
this.nextBlockReportTime.getAndSet(nextBlockReportTime);
}
/**
* Wrapped for testing.
* @return

View File

@ -31,6 +31,7 @@
import static java.lang.Math.abs;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@ -70,7 +71,7 @@ public void testScheduleBlockReportImmediate() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.scheduleBlockReport(0, true);
assertTrue(scheduler.resetBlockReportTime);
assertThat(scheduler.nextBlockReportTime, is(now));
assertThat(scheduler.getNextBlockReportTime(), is(now));
}
}
@ -81,8 +82,8 @@ public void testScheduleBlockReportDelayed() {
final long delayMs = 10;
scheduler.scheduleBlockReport(delayMs, true);
assertTrue(scheduler.resetBlockReportTime);
assertTrue(scheduler.nextBlockReportTime - now >= 0);
assertTrue(scheduler.nextBlockReportTime - (now + delayMs) < 0);
assertTrue(scheduler.getNextBlockReportTime() - now >= 0);
assertTrue(scheduler.getNextBlockReportTime() - (now + delayMs) < 0);
}
}
@ -96,7 +97,8 @@ public void testScheduleNextBlockReport() {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.resetBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - (now + BLOCK_REPORT_INTERVAL_MS) < 0);
assertTrue(scheduler.getNextBlockReportTime()
- (now + BLOCK_REPORT_INTERVAL_MS) < 0);
}
}
@ -110,7 +112,8 @@ public void testScheduleNextBlockReport2() {
Scheduler scheduler = makeMockScheduler(now);
scheduler.resetBlockReportTime = false;
scheduler.scheduleNextBlockReport();
assertThat(scheduler.nextBlockReportTime, is(now + BLOCK_REPORT_INTERVAL_MS));
assertThat(scheduler.getNextBlockReportTime(),
is(now + BLOCK_REPORT_INTERVAL_MS));
}
}
@ -129,10 +132,12 @@ public void testScheduleNextBlockReport3() {
final long blockReportDelay =
BLOCK_REPORT_INTERVAL_MS + random.nextInt(2 * (int) BLOCK_REPORT_INTERVAL_MS);
final long origBlockReportTime = now - blockReportDelay;
scheduler.nextBlockReportTime = origBlockReportTime;
scheduler.setNextBlockReportTime(origBlockReportTime);
scheduler.scheduleNextBlockReport();
assertTrue(scheduler.nextBlockReportTime - now < BLOCK_REPORT_INTERVAL_MS);
assertTrue(((scheduler.nextBlockReportTime - origBlockReportTime) % BLOCK_REPORT_INTERVAL_MS) == 0);
assertTrue((scheduler.getNextBlockReportTime() - now)
< BLOCK_REPORT_INTERVAL_MS);
assertEquals(0, ((scheduler.getNextBlockReportTime() - origBlockReportTime)
% BLOCK_REPORT_INTERVAL_MS));
}
}
@ -218,7 +223,7 @@ private Scheduler makeMockScheduler(long now) {
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.setNextBlockReportTime(now);
mockScheduler.nextHeartbeatTime = now;
mockScheduler.nextOutliersReportTime = now;
return mockScheduler;