HDFS-16872. Fix log throttling by declaring LogThrottlingHelper as static members (#5246)
Co-authored-by: Chengbing Liu <liuchengbing@qiyi.com> Signed-off-by: Erik Krogen <xkrogen@apache.org>
This commit is contained in:
parent
f3cff032e6
commit
4cf304de45
@ -65,7 +65,7 @@
|
|||||||
* <p>This class can also be used to coordinate multiple logging points; see
|
* <p>This class can also be used to coordinate multiple logging points; see
|
||||||
* {@link #record(String, long, double...)} for more details.
|
* {@link #record(String, long, double...)} for more details.
|
||||||
*
|
*
|
||||||
* <p>This class is not thread-safe.
|
* <p>This class is thread-safe.
|
||||||
*/
|
*/
|
||||||
public class LogThrottlingHelper {
|
public class LogThrottlingHelper {
|
||||||
|
|
||||||
@ -192,7 +192,7 @@ public LogThrottlingHelper(long minLogPeriodMs, String primaryRecorderName) {
|
|||||||
* @return A LogAction indicating whether or not the caller should write to
|
* @return A LogAction indicating whether or not the caller should write to
|
||||||
* its log.
|
* its log.
|
||||||
*/
|
*/
|
||||||
public LogAction record(double... values) {
|
public synchronized LogAction record(double... values) {
|
||||||
return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values);
|
return record(DEFAULT_RECORDER_NAME, timer.monotonicNow(), values);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,7 +244,7 @@ public LogAction record(double... values) {
|
|||||||
*
|
*
|
||||||
* @see #record(double...)
|
* @see #record(double...)
|
||||||
*/
|
*/
|
||||||
public LogAction record(String recorderName, long currentTimeMs,
|
public synchronized LogAction record(String recorderName, long currentTimeMs,
|
||||||
double... values) {
|
double... values) {
|
||||||
if (primaryRecorderName == null) {
|
if (primaryRecorderName == null) {
|
||||||
primaryRecorderName = recorderName;
|
primaryRecorderName = recorderName;
|
||||||
@ -287,7 +287,7 @@ public LogAction record(String recorderName, long currentTimeMs,
|
|||||||
* @param idx The index value.
|
* @param idx The index value.
|
||||||
* @return The summary information.
|
* @return The summary information.
|
||||||
*/
|
*/
|
||||||
public SummaryStatistics getCurrentStats(String recorderName, int idx) {
|
public synchronized SummaryStatistics getCurrentStats(String recorderName, int idx) {
|
||||||
LoggingAction currentLog = currentLogs.get(recorderName);
|
LoggingAction currentLog = currentLogs.get(recorderName);
|
||||||
if (currentLog != null) {
|
if (currentLog != null) {
|
||||||
return currentLog.getStats(idx);
|
return currentLog.getStats(idx);
|
||||||
@ -314,6 +314,13 @@ public static String getLogSupressionMessage(LogAction action) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized void reset() {
|
||||||
|
primaryRecorderName = null;
|
||||||
|
currentLogs.clear();
|
||||||
|
lastLogTimestampMs = Long.MIN_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A standard log action which keeps track of all of the values which have
|
* A standard log action which keeps track of all of the values which have
|
||||||
* been logged. This is also used for internal bookkeeping via its private
|
* been logged. This is also used for internal bookkeeping via its private
|
||||||
|
@ -132,7 +132,8 @@ public class FSEditLogLoader {
|
|||||||
/** Limit logging about edit loading to every 5 seconds max. */
|
/** Limit logging about edit loading to every 5 seconds max. */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
|
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
|
||||||
private final LogThrottlingHelper loadEditsLogHelper =
|
@VisibleForTesting
|
||||||
|
static final LogThrottlingHelper LOAD_EDITS_LOG_HELPER =
|
||||||
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
|
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
|
||||||
|
|
||||||
private final FSNamesystem fsNamesys;
|
private final FSNamesystem fsNamesys;
|
||||||
@ -173,7 +174,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
|||||||
fsNamesys.writeLock();
|
fsNamesys.writeLock();
|
||||||
try {
|
try {
|
||||||
long startTime = timer.monotonicNow();
|
long startTime = timer.monotonicNow();
|
||||||
LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
|
LogAction preLogAction = LOAD_EDITS_LOG_HELPER.record("pre", startTime);
|
||||||
if (preLogAction.shouldLog()) {
|
if (preLogAction.shouldLog()) {
|
||||||
FSImage.LOG.info("Start loading edits file " + edits.getName()
|
FSImage.LOG.info("Start loading edits file " + edits.getName()
|
||||||
+ " maxTxnsToRead = " + maxTxnsToRead +
|
+ " maxTxnsToRead = " + maxTxnsToRead +
|
||||||
@ -182,7 +183,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
|||||||
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
||||||
maxTxnsToRead, startOpt, recovery);
|
maxTxnsToRead, startOpt, recovery);
|
||||||
long endTime = timer.monotonicNow();
|
long endTime = timer.monotonicNow();
|
||||||
LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
|
LogAction postLogAction = LOAD_EDITS_LOG_HELPER.record("post", endTime,
|
||||||
numEdits, edits.length(), endTime - startTime);
|
numEdits, edits.length(), endTime - startTime);
|
||||||
if (postLogAction.shouldLog()) {
|
if (postLogAction.shouldLog()) {
|
||||||
FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
|
FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
|
||||||
|
@ -47,7 +47,7 @@ class RedundantEditLogInputStream extends EditLogInputStream {
|
|||||||
|
|
||||||
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
|
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
|
||||||
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
|
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
|
||||||
private final LogThrottlingHelper fastForwardLoggingHelper =
|
private static final LogThrottlingHelper FAST_FORWARD_LOGGING_HELPER =
|
||||||
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
|
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -182,7 +182,7 @@ protected FSEditLogOp nextOp() throws IOException {
|
|||||||
case SKIP_UNTIL:
|
case SKIP_UNTIL:
|
||||||
try {
|
try {
|
||||||
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
|
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
|
||||||
LogAction logAction = fastForwardLoggingHelper.record();
|
LogAction logAction = FAST_FORWARD_LOGGING_HELPER.record();
|
||||||
if (logAction.shouldLog()) {
|
if (logAction.shouldLog()) {
|
||||||
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
|
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
|
||||||
"' to transaction ID " + (prevTxId + 1) +
|
"' to transaction ID " + (prevTxId + 1) +
|
||||||
|
@ -807,12 +807,13 @@ public void testErasureCodingPolicyOperations() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void setLoadFSEditLogThrottling() throws Exception {
|
public void testLoadFSEditLogThrottling() throws Exception {
|
||||||
FSNamesystem namesystem = mock(FSNamesystem.class);
|
FSNamesystem namesystem = mock(FSNamesystem.class);
|
||||||
namesystem.dir = mock(FSDirectory.class);
|
namesystem.dir = mock(FSDirectory.class);
|
||||||
|
|
||||||
FakeTimer timer = new FakeTimer();
|
FakeTimer timer = new FakeTimer();
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
|
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
|
||||||
|
FSEditLogLoader.LOAD_EDITS_LOG_HELPER.reset();
|
||||||
|
|
||||||
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
|
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
|
||||||
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
|
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
|
||||||
|
Loading…
Reference in New Issue
Block a user