From aa9cdf2af6fd84aa24ec5a19da4f955472a8d5bd Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Thu, 26 Aug 2021 13:07:38 +0530 Subject: [PATCH] HDFS-16143. Add Timer in EditLogTailer and de-flake TestEditLogTailer#testStandbyTriggersLogRollsWhenTailInProgressEdits (#3235) Contributed by Viraj Jasani. Signed-off-by: Mingliang Liu Signed-off-by: Takanobu Asanuma Signed-off-by: Wei-Chiu Chuang --- .../org/apache/hadoop/util/FakeTimer.java | 10 ++++ .../server/namenode/ha/EditLogTailer.java | 59 +++++++++++++------ .../server/namenode/ha/TestEditLogTailer.java | 52 +++++++++------- 3 files changed, 82 insertions(+), 39 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java index 05d66d39f5..17d20ea663 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/FakeTimer.java @@ -39,6 +39,16 @@ public FakeTimer() { nowNanos = TimeUnit.MILLISECONDS.toNanos(1000); } + /** + * FakeTimer constructor with milliseconds to keep as initial value. + * + * @param time time in millis. + */ + public FakeTimer(long time) { + now = time; + nowNanos = TimeUnit.MILLISECONDS.toNanos(time); + } + @Override public long now() { return now; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index b82fb5b0e4..c4934bd1cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -55,12 +56,10 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.SecurityUtil; -import static org.apache.hadoop.util.Time.monotonicNow; import static org.apache.hadoop.util.ExitUtil.terminate; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import org.apache.hadoop.util.Time; /** @@ -172,14 +171,21 @@ public class EditLogTailer { */ private final long maxTxnsPerLock; + /** + * Timer instance to be set only using constructor. + * Only tests can reassign this by using setTimerForTests(). + * For source code, this timer instance should be treated as final. + */ + private Timer timer; + public EditLogTailer(FSNamesystem namesystem, Configuration conf) { this.tailerThread = new EditLogTailerThread(); this.conf = conf; this.namesystem = namesystem; + this.timer = new Timer(); this.editLog = namesystem.getEditLog(); - - lastLoadTimeMs = monotonicNow(); - lastRollTimeMs = monotonicNow(); + this.lastLoadTimeMs = timer.monotonicNow(); + this.lastRollTimeMs = timer.monotonicNow(); logRollPeriodMs = conf.getTimeDuration( DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, @@ -301,7 +307,7 @@ public Void run() throws Exception { long editsTailed = 0; // Fully tail the journal to the end do { - long startTime = Time.monotonicNow(); + long startTime = timer.monotonicNow(); try { NameNode.getNameNodeMetrics().addEditLogTailInterval( startTime - lastLoadTimeMs); @@ -312,7 +318,7 @@ public Void run() throws Exception { throw new IOException(e); } finally { NameNode.getNameNodeMetrics().addEditLogTailTime( - Time.monotonicNow() - startTime); + timer.monotonicNow() - startTime); } } while(editsTailed > 0); return null; @@ -336,7 +342,7 @@ public long doTailEdits() throws IOException, InterruptedException { LOG.debug("lastTxnId: " + lastTxnId); } Collection streams; - long startTime = Time.monotonicNow(); + long startTime = timer.monotonicNow(); try { streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, inProgressOk, true); @@ -349,7 +355,7 @@ public long doTailEdits() throws IOException, InterruptedException { return 0; } finally { NameNode.getNameNodeMetrics().addEditLogFetchTime( - Time.monotonicNow() - startTime); + timer.monotonicNow() - startTime); } if (LOG.isDebugEnabled()) { LOG.debug("edit streams to load from: " + streams.size()); @@ -374,7 +380,7 @@ public long doTailEdits() throws IOException, InterruptedException { } if (editsLoaded > 0) { - lastLoadTimeMs = monotonicNow(); + lastLoadTimeMs = timer.monotonicNow(); } lastLoadedTxnId = image.getLastAppliedTxId(); return editsLoaded; @@ -395,7 +401,7 @@ public long getLastLoadTimeMs() { */ private boolean tooLongSinceLastLoad() { return logRollPeriodMs >= 0 && - (monotonicNow() - lastRollTimeMs) > logRollPeriodMs; + (timer.monotonicNow() - lastRollTimeMs) > logRollPeriodMs; } /** @@ -423,9 +429,9 @@ void triggerActiveLogRoll() { try { future = rollEditsRpcExecutor.submit(getNameNodeProxy()); future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); - lastRollTimeMs = monotonicNow(); + this.lastRollTimeMs = timer.monotonicNow(); lastRollTriggerTxId = lastLoadedTxnId; - } catch (ExecutionException e) { + } catch (ExecutionException | InterruptedException e) { LOG.warn("Unable to trigger a roll of the active NN", e); } catch (TimeoutException e) { if (future != null) { @@ -433,11 +439,30 @@ void triggerActiveLogRoll() { } LOG.warn(String.format( "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); - } catch (InterruptedException e) { - LOG.warn("Unable to trigger a roll of the active NN", e); } } + /** + * This is only to be used by tests. For source code, the only way to + * set timer is by using EditLogTailer constructor. + * + * @param newTimer Timer instance provided by tests. + */ + @VisibleForTesting + void setTimerForTest(final Timer newTimer) { + this.timer = newTimer; + } + + /** + * Used by tests. Return Timer instance used by EditLogTailer. + * + * @return Return Timer instance used by EditLogTailer. + */ + @VisibleForTesting + Timer getTimer() { + return timer; + } + @VisibleForTesting void sleep(long sleepTimeMillis) throws InterruptedException { Thread.sleep(sleepTimeMillis); @@ -497,7 +522,7 @@ private void doWork() { // name system lock will be acquired to further block even the block // state updates. namesystem.cpLockInterruptibly(); - long startTime = Time.monotonicNow(); + long startTime = timer.monotonicNow(); try { NameNode.getNameNodeMetrics().addEditLogTailInterval( startTime - lastLoadTimeMs); @@ -505,7 +530,7 @@ private void doWork() { } finally { namesystem.cpUnlock(); NameNode.getNameNodeMetrics().addEditLogTailTime( - Time.monotonicNow() - startTime); + timer.monotonicNow() - startTime); } //Update NameDirSize Metric if (triggeredLogRoll) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 38e7df5f39..23569130c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.FakeTimer; import org.slf4j.event.Level; import org.junit.Test; import org.junit.runner.RunWith; @@ -394,13 +395,15 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() // Time in seconds to wait before checking if edit logs are rolled while // expecting no edit log roll final int noLogRollWaitTime = 2; + // Time in seconds to wait before checking if edit logs are rolled while - // expecting edit log roll + // expecting edit log roll. final int logRollWaitTime = 3; + final int logRollPeriod = standbyCatchupWaitTime + noLogRollWaitTime + 1; + final long logRollPeriodMs = TimeUnit.SECONDS.toMillis(logRollPeriod); Configuration conf = getConf(); - conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, - standbyCatchupWaitTime + noLogRollWaitTime + 1); + conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriod); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); @@ -429,19 +432,29 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId, standbyCatchupWaitTime); + long curTime = standby.getNamesystem().getEditLogTailer().getTimer() + .monotonicNow(); + long insufficientTimeForLogRoll = logRollPeriodMs / 3; + final FakeTimer testTimer = + new FakeTimer(curTime + insufficientTimeForLogRoll); + standby.getNamesystem().getEditLogTailer().setTimerForTest(testTimer); + Thread.sleep(2000); + for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) { NameNodeAdapter.mkdirs(active, getDirPath(i), new PermissionStatus("test", "test", new FsPermission((short)00755)), true); } - boolean exceptionThrown = false; try { checkForLogRoll(active, origTxId, noLogRollWaitTime); + fail("Expected to timeout"); } catch (TimeoutException e) { - exceptionThrown = true; + // expected } - assertTrue(exceptionThrown); + + long sufficientTimeForLogRoll = logRollPeriodMs * 3; + testTimer.advance(sufficientTimeForLogRoll); checkForLogRoll(active, origTxId, logRollWaitTime); } finally { @@ -452,26 +465,20 @@ public void testStandbyTriggersLogRollsWhenTailInProgressEdits() private static void waitForStandbyToCatchUpWithInProgressEdits( final NameNode standby, final long activeTxId, int maxWaitSec) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - long standbyTxId = standby.getNamesystem().getFSImage() - .getLastAppliedTxId(); - return (standbyTxId >= activeTxId); - } - }, 100, maxWaitSec * 1000); + GenericTestUtils.waitFor(() -> { + long standbyTxId = standby.getNamesystem().getFSImage() + .getLastAppliedTxId(); + return (standbyTxId >= activeTxId); + }, 100, TimeUnit.SECONDS.toMillis(maxWaitSec)); } private static void checkForLogRoll(final NameNode active, final long origTxId, int maxWaitSec) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog() - .getCurSegmentTxId(); - return (origTxId != curSegmentTxId); - } - }, 100, maxWaitSec * 1000); + GenericTestUtils.waitFor(() -> { + long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog() + .getCurSegmentTxId(); + return (origTxId != curSegmentTxId); + }, 100, TimeUnit.SECONDS.toMillis(maxWaitSec)); } private static MiniDFSCluster createMiniDFSCluster(Configuration conf, @@ -488,4 +495,5 @@ private static MiniDFSCluster createMiniDFSCluster(Configuration conf, .build(); return cluster; } + }