HDFS-16143. Add Timer in EditLogTailer and de-flake TestEditLogTailer#testStandbyTriggersLogRollsWhenTailInProgressEdits (#3235)
Contributed by Viraj Jasani. Signed-off-by: Mingliang Liu <liuml07@apache.org> Signed-off-by: Takanobu Asanuma <tasanuma@apache.org> Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
parent
b53cae0ffb
commit
aa9cdf2af6
@ -39,6 +39,16 @@ public FakeTimer() {
|
|||||||
nowNanos = TimeUnit.MILLISECONDS.toNanos(1000);
|
nowNanos = TimeUnit.MILLISECONDS.toNanos(1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FakeTimer constructor with milliseconds to keep as initial value.
|
||||||
|
*
|
||||||
|
* @param time time in millis.
|
||||||
|
*/
|
||||||
|
public FakeTimer(long time) {
|
||||||
|
now = time;
|
||||||
|
nowNanos = TimeUnit.MILLISECONDS.toNanos(time);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long now() {
|
public long now() {
|
||||||
return now;
|
return now;
|
||||||
|
@ -36,6 +36,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.util.Timer;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
@ -55,12 +56,10 @@
|
|||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
|
||||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -172,14 +171,21 @@ public class EditLogTailer {
|
|||||||
*/
|
*/
|
||||||
private final long maxTxnsPerLock;
|
private final long maxTxnsPerLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timer instance to be set only using constructor.
|
||||||
|
* Only tests can reassign this by using setTimerForTests().
|
||||||
|
* For source code, this timer instance should be treated as final.
|
||||||
|
*/
|
||||||
|
private Timer timer;
|
||||||
|
|
||||||
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
||||||
this.tailerThread = new EditLogTailerThread();
|
this.tailerThread = new EditLogTailerThread();
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
|
this.timer = new Timer();
|
||||||
this.editLog = namesystem.getEditLog();
|
this.editLog = namesystem.getEditLog();
|
||||||
|
this.lastLoadTimeMs = timer.monotonicNow();
|
||||||
lastLoadTimeMs = monotonicNow();
|
this.lastRollTimeMs = timer.monotonicNow();
|
||||||
lastRollTimeMs = monotonicNow();
|
|
||||||
|
|
||||||
logRollPeriodMs = conf.getTimeDuration(
|
logRollPeriodMs = conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||||
@ -301,7 +307,7 @@ public Void run() throws Exception {
|
|||||||
long editsTailed = 0;
|
long editsTailed = 0;
|
||||||
// Fully tail the journal to the end
|
// Fully tail the journal to the end
|
||||||
do {
|
do {
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = timer.monotonicNow();
|
||||||
try {
|
try {
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
||||||
startTime - lastLoadTimeMs);
|
startTime - lastLoadTimeMs);
|
||||||
@ -312,7 +318,7 @@ public Void run() throws Exception {
|
|||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
} finally {
|
} finally {
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
||||||
Time.monotonicNow() - startTime);
|
timer.monotonicNow() - startTime);
|
||||||
}
|
}
|
||||||
} while(editsTailed > 0);
|
} while(editsTailed > 0);
|
||||||
return null;
|
return null;
|
||||||
@ -336,7 +342,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
|||||||
LOG.debug("lastTxnId: " + lastTxnId);
|
LOG.debug("lastTxnId: " + lastTxnId);
|
||||||
}
|
}
|
||||||
Collection<EditLogInputStream> streams;
|
Collection<EditLogInputStream> streams;
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = timer.monotonicNow();
|
||||||
try {
|
try {
|
||||||
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
|
streams = editLog.selectInputStreams(lastTxnId + 1, 0,
|
||||||
null, inProgressOk, true);
|
null, inProgressOk, true);
|
||||||
@ -349,7 +355,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
|||||||
return 0;
|
return 0;
|
||||||
} finally {
|
} finally {
|
||||||
NameNode.getNameNodeMetrics().addEditLogFetchTime(
|
NameNode.getNameNodeMetrics().addEditLogFetchTime(
|
||||||
Time.monotonicNow() - startTime);
|
timer.monotonicNow() - startTime);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("edit streams to load from: " + streams.size());
|
LOG.debug("edit streams to load from: " + streams.size());
|
||||||
@ -374,7 +380,7 @@ public long doTailEdits() throws IOException, InterruptedException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (editsLoaded > 0) {
|
if (editsLoaded > 0) {
|
||||||
lastLoadTimeMs = monotonicNow();
|
lastLoadTimeMs = timer.monotonicNow();
|
||||||
}
|
}
|
||||||
lastLoadedTxnId = image.getLastAppliedTxId();
|
lastLoadedTxnId = image.getLastAppliedTxId();
|
||||||
return editsLoaded;
|
return editsLoaded;
|
||||||
@ -395,7 +401,7 @@ public long getLastLoadTimeMs() {
|
|||||||
*/
|
*/
|
||||||
private boolean tooLongSinceLastLoad() {
|
private boolean tooLongSinceLastLoad() {
|
||||||
return logRollPeriodMs >= 0 &&
|
return logRollPeriodMs >= 0 &&
|
||||||
(monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
|
(timer.monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -423,9 +429,9 @@ void triggerActiveLogRoll() {
|
|||||||
try {
|
try {
|
||||||
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
|
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
|
||||||
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
||||||
lastRollTimeMs = monotonicNow();
|
this.lastRollTimeMs = timer.monotonicNow();
|
||||||
lastRollTriggerTxId = lastLoadedTxnId;
|
lastRollTriggerTxId = lastLoadedTxnId;
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
LOG.warn("Unable to trigger a roll of the active NN", e);
|
LOG.warn("Unable to trigger a roll of the active NN", e);
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
@ -433,11 +439,30 @@ void triggerActiveLogRoll() {
|
|||||||
}
|
}
|
||||||
LOG.warn(String.format(
|
LOG.warn(String.format(
|
||||||
"Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
|
"Unable to finish rolling edits in %d ms", rollEditsTimeoutMs));
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("Unable to trigger a roll of the active NN", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is only to be used by tests. For source code, the only way to
|
||||||
|
* set timer is by using EditLogTailer constructor.
|
||||||
|
*
|
||||||
|
* @param newTimer Timer instance provided by tests.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void setTimerForTest(final Timer newTimer) {
|
||||||
|
this.timer = newTimer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used by tests. Return Timer instance used by EditLogTailer.
|
||||||
|
*
|
||||||
|
* @return Return Timer instance used by EditLogTailer.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
Timer getTimer() {
|
||||||
|
return timer;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void sleep(long sleepTimeMillis) throws InterruptedException {
|
void sleep(long sleepTimeMillis) throws InterruptedException {
|
||||||
Thread.sleep(sleepTimeMillis);
|
Thread.sleep(sleepTimeMillis);
|
||||||
@ -497,7 +522,7 @@ private void doWork() {
|
|||||||
// name system lock will be acquired to further block even the block
|
// name system lock will be acquired to further block even the block
|
||||||
// state updates.
|
// state updates.
|
||||||
namesystem.cpLockInterruptibly();
|
namesystem.cpLockInterruptibly();
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = timer.monotonicNow();
|
||||||
try {
|
try {
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
||||||
startTime - lastLoadTimeMs);
|
startTime - lastLoadTimeMs);
|
||||||
@ -505,7 +530,7 @@ private void doWork() {
|
|||||||
} finally {
|
} finally {
|
||||||
namesystem.cpUnlock();
|
namesystem.cpUnlock();
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
||||||
Time.monotonicNow() - startTime);
|
timer.monotonicNow() - startTime);
|
||||||
}
|
}
|
||||||
//Update NameDirSize Metric
|
//Update NameDirSize Metric
|
||||||
if (triggeredLogRoll) {
|
if (triggeredLogRoll) {
|
||||||
|
@ -56,6 +56,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
import org.slf4j.event.Level;
|
import org.slf4j.event.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
@ -394,13 +395,15 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
|||||||
// Time in seconds to wait before checking if edit logs are rolled while
|
// Time in seconds to wait before checking if edit logs are rolled while
|
||||||
// expecting no edit log roll
|
// expecting no edit log roll
|
||||||
final int noLogRollWaitTime = 2;
|
final int noLogRollWaitTime = 2;
|
||||||
|
|
||||||
// Time in seconds to wait before checking if edit logs are rolled while
|
// Time in seconds to wait before checking if edit logs are rolled while
|
||||||
// expecting edit log roll
|
// expecting edit log roll.
|
||||||
final int logRollWaitTime = 3;
|
final int logRollWaitTime = 3;
|
||||||
|
|
||||||
|
final int logRollPeriod = standbyCatchupWaitTime + noLogRollWaitTime + 1;
|
||||||
|
final long logRollPeriodMs = TimeUnit.SECONDS.toMillis(logRollPeriod);
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriod);
|
||||||
standbyCatchupWaitTime + noLogRollWaitTime + 1);
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
|
|
||||||
@ -429,19 +432,29 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
|||||||
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
|
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
|
||||||
standbyCatchupWaitTime);
|
standbyCatchupWaitTime);
|
||||||
|
|
||||||
|
long curTime = standby.getNamesystem().getEditLogTailer().getTimer()
|
||||||
|
.monotonicNow();
|
||||||
|
long insufficientTimeForLogRoll = logRollPeriodMs / 3;
|
||||||
|
final FakeTimer testTimer =
|
||||||
|
new FakeTimer(curTime + insufficientTimeForLogRoll);
|
||||||
|
standby.getNamesystem().getEditLogTailer().setTimerForTest(testTimer);
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
||||||
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
||||||
new PermissionStatus("test", "test",
|
new PermissionStatus("test", "test",
|
||||||
new FsPermission((short)00755)), true);
|
new FsPermission((short)00755)), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean exceptionThrown = false;
|
|
||||||
try {
|
try {
|
||||||
checkForLogRoll(active, origTxId, noLogRollWaitTime);
|
checkForLogRoll(active, origTxId, noLogRollWaitTime);
|
||||||
|
fail("Expected to timeout");
|
||||||
} catch (TimeoutException e) {
|
} catch (TimeoutException e) {
|
||||||
exceptionThrown = true;
|
// expected
|
||||||
}
|
}
|
||||||
assertTrue(exceptionThrown);
|
|
||||||
|
long sufficientTimeForLogRoll = logRollPeriodMs * 3;
|
||||||
|
testTimer.advance(sufficientTimeForLogRoll);
|
||||||
|
|
||||||
checkForLogRoll(active, origTxId, logRollWaitTime);
|
checkForLogRoll(active, origTxId, logRollWaitTime);
|
||||||
} finally {
|
} finally {
|
||||||
@ -452,26 +465,20 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
|||||||
private static void waitForStandbyToCatchUpWithInProgressEdits(
|
private static void waitForStandbyToCatchUpWithInProgressEdits(
|
||||||
final NameNode standby, final long activeTxId,
|
final NameNode standby, final long activeTxId,
|
||||||
int maxWaitSec) throws Exception {
|
int maxWaitSec) throws Exception {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
|
||||||
public Boolean get() {
|
|
||||||
long standbyTxId = standby.getNamesystem().getFSImage()
|
long standbyTxId = standby.getNamesystem().getFSImage()
|
||||||
.getLastAppliedTxId();
|
.getLastAppliedTxId();
|
||||||
return (standbyTxId >= activeTxId);
|
return (standbyTxId >= activeTxId);
|
||||||
}
|
}, 100, TimeUnit.SECONDS.toMillis(maxWaitSec));
|
||||||
}, 100, maxWaitSec * 1000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void checkForLogRoll(final NameNode active,
|
private static void checkForLogRoll(final NameNode active,
|
||||||
final long origTxId, int maxWaitSec) throws Exception {
|
final long origTxId, int maxWaitSec) throws Exception {
|
||||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
GenericTestUtils.waitFor(() -> {
|
||||||
@Override
|
|
||||||
public Boolean get() {
|
|
||||||
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
|
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||||
.getCurSegmentTxId();
|
.getCurSegmentTxId();
|
||||||
return (origTxId != curSegmentTxId);
|
return (origTxId != curSegmentTxId);
|
||||||
}
|
}, 100, TimeUnit.SECONDS.toMillis(maxWaitSec));
|
||||||
}, 100, maxWaitSec * 1000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
||||||
@ -488,4 +495,5 @@ private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
|||||||
.build();
|
.build();
|
||||||
return cluster;
|
return cluster;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user