diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java index 1cb8ecfcf5..78b60e4117 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/LogThrottlingHelper.java @@ -65,7 +65,7 @@ *
This class can also be used to coordinate multiple logging points; see * {@link #record(String, long, double...)} for more details. * - *
This class is not thread-safe. + *
This class is thread-safe. */ public class LogThrottlingHelper { @@ -192,7 +192,7 @@ public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) { * @return A LogAction indicating whether or not the caller should write to * its log. */ - public LogAction record(double... values) { + public synchronized LogAction record(double... values) { return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values); } @@ -244,7 +244,7 @@ public LogAction record(double... values) { * * @see #record(double...) */ - public LogAction record(String recorderName, long currentTimeMs, + public synchronized LogAction record(String recorderName, long currentTimeMs, double... values) { if (primaryRecorderName == null) { primaryRecorderName = recorderName; @@ -287,7 +287,7 @@ public LogAction record(String recorderName, long currentTimeMs, * @param idx The index value. * @return The summary information. */ - public SummaryStatistics getCurrentStats(String recorderName, int idx) { + public synchronized SummaryStatistics getCurrentStats(String recorderName, int idx) { LoggingAction currentLog = currentLogs.get(recorderName); if (currentLog != null) { return currentLog.getStats(idx); @@ -314,6 +314,13 @@ public static String getLogSupressionMessage(LogAction action) { } } + @VisibleForTesting + public synchronized void reset() { + primaryRecorderName = null; + currentLogs.clear(); + lastLogTimestampMs = Long.MIN_VALUE; + } + /** * A standard log action which keeps track of all of the values which have * been logged. This is also used for internal bookkeeping via its private diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 6bb27d95be..03d403345a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -132,7 +132,8 @@ public class FSEditLogLoader { /** Limit logging about edit loading to every 5 seconds max. */ @VisibleForTesting static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000; - private final LogThrottlingHelper loadEditsLogHelper = + @VisibleForTesting + static final LogThrottlingHelper LOAD_EDITS_LOG_HELPER = new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS); private final FSNamesystem fsNamesys; @@ -173,7 +174,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, fsNamesys.writeLock(); try { long startTime = timer.monotonicNow(); - LogAction preLogAction = loadEditsLogHelper.record("pre", startTime); + LogAction preLogAction = LOAD_EDITS_LOG_HELPER.record("pre", startTime); if (preLogAction.shouldLog()) { FSImage.LOG.info("Start loading edits file " + edits.getName() + " maxTxnsToRead = " + maxTxnsToRead + @@ -182,7 +183,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, long numEdits = loadEditRecords(edits, false, expectedStartingTxId, maxTxnsToRead, startOpt, recovery); long endTime = timer.monotonicNow(); - LogAction postLogAction = loadEditsLogHelper.record("post", endTime, + LogAction postLogAction = LOAD_EDITS_LOG_HELPER.record("post", endTime, numEdits, edits.length(), endTime - startTime); if (postLogAction.shouldLog()) { FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java index 15f799ab21..c35a582d18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java @@ -47,7 +47,7 @@ class RedundantEditLogInputStream extends EditLogInputStream { /** Limit logging about fast forwarding the stream to every 5 seconds max. */ private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000; - private final LogThrottlingHelper fastForwardLoggingHelper = + private static final LogThrottlingHelper FAST_FORWARD_LOGGING_HELPER = new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS); /** @@ -182,7 +182,7 @@ protected FSEditLogOp nextOp() throws IOException { case SKIP_UNTIL: try { if (prevTxId != HdfsServerConstants.INVALID_TXID) { - LogAction logAction = fastForwardLoggingHelper.record(); + LogAction logAction = FAST_FORWARD_LOGGING_HELPER.record(); if (logAction.shouldLog()) { LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() + "' to transaction ID " + (prevTxId + 1) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 8008be79d9..89193ca663 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -807,12 +807,13 @@ public void testErasureCodingPolicyOperations() throws IOException { } @Test - public void setLoadFSEditLogThrottling() throws Exception { + public void testLoadFSEditLogThrottling() throws Exception { FSNamesystem namesystem = mock(FSNamesystem.class); namesystem.dir = mock(FSDirectory.class); FakeTimer timer = new FakeTimer(); FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer); + FSEditLogLoader.LOAD_EDITS_LOG_HELPER.reset(); LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG); loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);