HDFS-13791. Limit logging frequency of edit tail related statements. Contributed by Erik Krogen.

This commit is contained in:
Chen Liang 2018-09-27 10:12:37 -07:00 committed by Konstantin V Shvachko
parent b74a7dbf88
commit a65bb97f5d
6 changed files with 147 additions and 15 deletions

View File

@ -288,6 +288,24 @@ public SummaryStatistics getCurrentStats(String recorderName, int idx) {
return null;
}
/**
* Helper function to create a message about how many log statements were
* suppressed in the provided log action. If no statements were suppressed,
* this returns an empty string. The message has the format (without quotes):
*
* <p/>' (suppressed logging <i>{suppression_count}</i> times)'
*
* @param action The log action to produce a message about.
* @return A message about suppression within this action.
*/
public static String getLogSupressionMessage(LogAction action) {
if (action.getCount() > 1) {
return " (suppressed logging " + (action.getCount() - 1) + " times)";
} else {
return "";
}
}
/**
* A standard log action which keeps track of all of the values which have
* been logged. This is also used for internal bookkeeping via its private

View File

@ -54,6 +54,8 @@
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@ -105,6 +107,11 @@ public class QuorumJournalManager implements JournalManager {
private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory;
/** Limit logging about input stream selection to every 5 seconds max. */
private static final long SELECT_INPUT_STREAM_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper selectInputStreamLogHelper =
new LogThrottlingHelper(SELECT_INPUT_STREAM_LOG_INTERVAL_MS);
@VisibleForTesting
public QuorumJournalManager(Configuration conf,
URI uri,
@ -567,8 +574,12 @@ private void selectRpcInputStreams(Collection<EditLogInputStream> streams,
"ID " + fromTxnId);
return;
}
LOG.info("Selected loggers with >= " + maxAllowedTxns +
" transactions starting from " + fromTxnId);
LogAction logAction = selectInputStreamLogHelper.record(fromTxnId);
if (logAction.shouldLog()) {
LOG.info("Selected loggers with >= " + maxAllowedTxns + " transactions " +
"starting from lowest txn ID " + logAction.getStats(0).getMin() +
LogThrottlingHelper.getLogSupressionMessage(logAction));
}
PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<>(
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
for (GetJournaledEditsResponseProto resp : responseMap.values()) {

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.FilterInputStream;
import java.io.IOException;
@ -113,11 +112,16 @@
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.util.ChunkedArrayList;
import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.log.LogThrottlingHelper.LogAction;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLogLoader {
@ -125,16 +129,29 @@ public class FSEditLogLoader {
LoggerFactory.getLogger(FSEditLogLoader.class.getName());
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
/** Limit logging about edit loading to every 5 seconds max. */
@VisibleForTesting
static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditsLogHelper =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
private final FSNamesystem fsNamesys;
private final BlockManager blockManager;
private final Timer timer;
private long lastAppliedTxId;
/** Total number of end transactions loaded. */
private int totalEdits = 0;
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
this(fsNamesys, lastAppliedTxId, new Timer());
}
@VisibleForTesting
FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId, Timer timer) {
this.fsNamesys = fsNamesys;
this.blockManager = fsNamesys.getBlockManager();
this.lastAppliedTxId = lastAppliedTxId;
this.timer = timer;
}
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
@ -155,14 +172,26 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock();
try {
long startTime = monotonicNow();
FSImage.LOG.info("Start loading edits file " + edits.getName()
+ " maxTxnsToRead = " + maxTxnsToRead);
long startTime = timer.monotonicNow();
LogAction preLogAction = loadEditsLogHelper.record("pre", startTime);
if (preLogAction.shouldLog()) {
FSImage.LOG.info("Start loading edits file " + edits.getName()
+ " maxTxnsToRead = " + maxTxnsToRead +
LogThrottlingHelper.getLogSupressionMessage(preLogAction));
}
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
maxTxnsToRead, startOpt, recovery);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
long endTime = timer.monotonicNow();
LogAction postLogAction = loadEditsLogHelper.record("post", endTime,
numEdits, edits.length(), endTime - startTime);
if (postLogAction.shouldLog()) {
FSImage.LOG.info("Loaded {} edits file(s) (the last named {}) of " +
"total size {}, total edits {}, total load time {} ms",
postLogAction.getCount(), edits.getName(),
postLogAction.getStats(1).getSum(),
postLogAction.getStats(0).getSum(),
postLogAction.getStats(2).getSum());
}
return numEdits;
} finally {
edits.close();
@ -203,7 +232,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
Step step = createStartupProgressStep(in);
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
long lastLogTime = monotonicNow();
long lastLogTime = timer.monotonicNow();
long lastInodeId = fsNamesys.dir.getLastInodeId();
try {
@ -283,7 +312,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
}
// log progress
if (op.hasTransactionId()) {
long now = monotonicNow();
long now = timer.monotonicNow();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
int percent = Math.round((float) deltaTxId / numTxns * 100);

View File

@ -69,6 +69,8 @@
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
@ -124,6 +126,11 @@ public class FSImage implements Closeable {
private final Set<Long> currentlyCheckpointing =
Collections.<Long>synchronizedSet(new HashSet<Long>());
/** Limit logging about edit loading to every 5 seconds max. */
private static final long LOAD_EDIT_LOG_INTERVAL_MS = 5000;
private final LogThrottlingHelper loadEditLogHelper =
new LogThrottlingHelper(LOAD_EDIT_LOG_INTERVAL_MS);
/**
* Construct an FSImage
* @param conf Configuration
@ -886,8 +893,16 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1));
LogAction logAction = loadEditLogHelper.record();
if (logAction.shouldLog()) {
String logSuppressed = "";
if (logAction.getCount() > 1) {
logSuppressed = "; suppressed logging for " +
(logAction.getCount() - 1) + " edit reads";
}
LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1) + logSuppressed);
}
try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery);

View File

@ -28,6 +28,8 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import org.apache.hadoop.log.LogThrottlingHelper;
import org.apache.hadoop.log.LogThrottlingHelper.LogAction;
/**
* A merged input stream that handles failover between different edit logs.
@ -43,6 +45,11 @@ class RedundantEditLogInputStream extends EditLogInputStream {
private long prevTxId;
private final EditLogInputStream[] streams;
/** Limit logging about fast forwarding the stream to every 5 seconds max. */
private static final long FAST_FORWARD_LOGGING_INTERVAL_MS = 5000;
private final LogThrottlingHelper fastForwardLoggingHelper =
new LogThrottlingHelper(FAST_FORWARD_LOGGING_INTERVAL_MS);
/**
* States that the RedundantEditLogInputStream can be in.
*
@ -174,8 +181,12 @@ protected FSEditLogOp nextOp() throws IOException {
case SKIP_UNTIL:
try {
if (prevTxId != HdfsServerConstants.INVALID_TXID) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1));
LogAction logAction = fastForwardLoggingHelper.record();
if (logAction.shouldLog()) {
LOG.info("Fast-forwarding stream '" + streams[curIdx].getName() +
"' to transaction ID " + (prevTxId + 1) +
LogThrottlingHelper.getLogSupressionMessage(logAction));
}
streams[curIdx].skipUntil(prevTxId + 1);
}
} catch (IOException e) {

View File

@ -19,10 +19,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.BufferedInputStream;
import java.io.File;
@ -61,7 +64,9 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.FakeTimer;
import org.slf4j.event.Level;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -101,6 +106,7 @@ private static Configuration getConf() {
private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class);
private static final int NUM_DATA_NODES = 0;
private static final String FAKE_EDIT_STREAM_NAME = "FAKE_STREAM";
private final ErasureCodingPolicy testECPolicy
= StripedFileTestUtil.getDefaultECPolicy();
@ -799,4 +805,46 @@ public void testErasureCodingPolicyOperations() throws IOException {
}
}
}
@Test
public void setLoadFSEditLogThrottling() throws Exception {
FSNamesystem namesystem = mock(FSNamesystem.class);
namesystem.dir = mock(FSDirectory.class);
FakeTimer timer = new FakeTimer();
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0, timer);
LogCapturer capture = LogCapturer.captureLogs(FSImage.LOG);
loader.loadFSEdits(getFakeEditLogInputStream(1, 10), 1);
assertTrue(capture.getOutput().contains("Start loading edits file " +
FAKE_EDIT_STREAM_NAME));
assertTrue(capture.getOutput().contains("Loaded 1 edits file(s)"));
assertFalse(capture.getOutput().contains("suppressed"));
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS / 2);
capture.clearOutput();
loader.loadFSEdits(getFakeEditLogInputStream(11, 20), 11);
assertFalse(capture.getOutput().contains("Start loading edits file"));
assertFalse(capture.getOutput().contains("edits file(s)"));
timer.advance(FSEditLogLoader.LOAD_EDIT_LOG_INTERVAL_MS);
capture.clearOutput();
loader.loadFSEdits(getFakeEditLogInputStream(21, 30), 21);
assertTrue(capture.getOutput().contains("Start loading edits file " +
FAKE_EDIT_STREAM_NAME));
assertTrue(capture.getOutput().contains("suppressed logging 1 times"));
assertTrue(capture.getOutput().contains("Loaded 2 edits file(s)"));
assertTrue(capture.getOutput().contains("total size 2.0"));
}
private EditLogInputStream getFakeEditLogInputStream(long startTx, long endTx)
throws IOException {
EditLogInputStream fakeStream = mock(EditLogInputStream.class);
when(fakeStream.getName()).thenReturn(FAKE_EDIT_STREAM_NAME);
when(fakeStream.getFirstTxId()).thenReturn(startTx);
when(fakeStream.getLastTxId()).thenReturn(endTx);
when(fakeStream.length()).thenReturn(1L);
return fakeStream;
}
}