HDFS-14370. Add exponential backoff to the edit log tailer to avoid spinning on empty edit tail requests. Contributed by Erik Krogen.
This commit is contained in:
parent
0520f5cede
commit
827dbb11e2
@ -1004,6 +1004,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
|
public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m
|
||||||
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
|
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
|
||||||
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
||||||
|
public static final String DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY = "dfs.ha.tail-edits.period.backoff-max";
|
||||||
|
public static final int DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT = 0; // disabled
|
||||||
public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
|
public static final String DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY = "dfs.ha.tail-edits.namenode-retries";
|
||||||
public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
|
public static final int DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT = 3;
|
||||||
public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY =
|
public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY =
|
||||||
|
@ -134,10 +134,17 @@ public class EditLogTailer {
|
|||||||
private final ExecutorService rollEditsRpcExecutor;
|
private final ExecutorService rollEditsRpcExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* How often the Standby should check if there are new finalized segment(s)
|
* How often the tailer should check if there are new edit log entries
|
||||||
* available to be read from.
|
* ready to be consumed. This is the initial delay before any backoff.
|
||||||
*/
|
*/
|
||||||
private final long sleepTimeMs;
|
private final long sleepTimeMs;
|
||||||
|
/**
|
||||||
|
* The maximum time the tailer should wait between checking for new edit log
|
||||||
|
* entries. Exponential backoff will be applied when an edit log tail is
|
||||||
|
* performed but no edits are available to be read. If this is less than or
|
||||||
|
* equal to 0, backoff is disabled.
|
||||||
|
*/
|
||||||
|
private final long maxSleepTimeMs;
|
||||||
|
|
||||||
private final int nnCount;
|
private final int nnCount;
|
||||||
private NamenodeProtocol cachedActiveProxy = null;
|
private NamenodeProtocol cachedActiveProxy = null;
|
||||||
@ -206,6 +213,19 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
|||||||
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
|
||||||
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT,
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT,
|
||||||
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
||||||
|
long maxSleepTimeMsTemp = conf.getTimeDuration(
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_DEFAULT,
|
||||||
|
TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
|
||||||
|
if (maxSleepTimeMsTemp > 0 && maxSleepTimeMsTemp < sleepTimeMs) {
|
||||||
|
LOG.warn("{} was configured to be {} ms, but this is less than {}."
|
||||||
|
+ "Disabling backoff when tailing edit logs.",
|
||||||
|
DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
||||||
|
maxSleepTimeMsTemp, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY);
|
||||||
|
maxSleepTimeMs = 0;
|
||||||
|
} else {
|
||||||
|
maxSleepTimeMs = maxSleepTimeMsTemp;
|
||||||
|
}
|
||||||
|
|
||||||
rollEditsTimeoutMs = conf.getTimeDuration(
|
rollEditsTimeoutMs = conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
|
DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY,
|
||||||
@ -291,7 +311,7 @@ public Void run() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void doTailEdits() throws IOException, InterruptedException {
|
public long doTailEdits() throws IOException, InterruptedException {
|
||||||
// Write lock needs to be interruptible here because the
|
// Write lock needs to be interruptible here because the
|
||||||
// transitionToActive RPC takes the write lock before calling
|
// transitionToActive RPC takes the write lock before calling
|
||||||
// tailer.stop() -- so if we're not interruptible, it will
|
// tailer.stop() -- so if we're not interruptible, it will
|
||||||
@ -316,7 +336,7 @@ public void doTailEdits() throws IOException, InterruptedException {
|
|||||||
// edits file hasn't been started yet.
|
// edits file hasn't been started yet.
|
||||||
LOG.warn("Edits tailer failed to find any streams. Will try again " +
|
LOG.warn("Edits tailer failed to find any streams. Will try again " +
|
||||||
"later.", ioe);
|
"later.", ioe);
|
||||||
return;
|
return 0;
|
||||||
} finally {
|
} finally {
|
||||||
NameNode.getNameNodeMetrics().addEditLogFetchTime(
|
NameNode.getNameNodeMetrics().addEditLogFetchTime(
|
||||||
Time.monotonicNow() - startTime);
|
Time.monotonicNow() - startTime);
|
||||||
@ -347,6 +367,7 @@ public void doTailEdits() throws IOException, InterruptedException {
|
|||||||
lastLoadTimeMs = monotonicNow();
|
lastLoadTimeMs = monotonicNow();
|
||||||
}
|
}
|
||||||
lastLoadedTxnId = image.getLastAppliedTxId();
|
lastLoadedTxnId = image.getLastAppliedTxId();
|
||||||
|
return editsLoaded;
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
}
|
}
|
||||||
@ -407,6 +428,11 @@ void triggerActiveLogRoll() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void sleep(long sleepTimeMillis) throws InterruptedException {
|
||||||
|
Thread.sleep(sleepTimeMillis);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The thread which does the actual work of tailing edits journals and
|
* The thread which does the actual work of tailing edits journals and
|
||||||
* applying the transactions to the FSNS.
|
* applying the transactions to the FSNS.
|
||||||
@ -435,7 +461,9 @@ public Object run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void doWork() {
|
private void doWork() {
|
||||||
|
long currentSleepTimeMs = sleepTimeMs;
|
||||||
while (shouldRun) {
|
while (shouldRun) {
|
||||||
|
long editsTailed = 0;
|
||||||
try {
|
try {
|
||||||
// There's no point in triggering a log roll if the Standby hasn't
|
// There's no point in triggering a log roll if the Standby hasn't
|
||||||
// read any more transactions since the last time a roll was
|
// read any more transactions since the last time a roll was
|
||||||
@ -461,7 +489,7 @@ private void doWork() {
|
|||||||
try {
|
try {
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
NameNode.getNameNodeMetrics().addEditLogTailInterval(
|
||||||
startTime - lastLoadTimeMs);
|
startTime - lastLoadTimeMs);
|
||||||
doTailEdits();
|
editsTailed = doTailEdits();
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.cpUnlock();
|
namesystem.cpUnlock();
|
||||||
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
NameNode.getNameNodeMetrics().addEditLogTailTime(
|
||||||
@ -481,7 +509,17 @@ private void doWork() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(sleepTimeMs);
|
if (editsTailed == 0 && maxSleepTimeMs > 0) {
|
||||||
|
// If no edits were tailed, apply exponential backoff
|
||||||
|
// before tailing again. Double the current sleep time on each
|
||||||
|
// empty response, but don't exceed the max. If the sleep time
|
||||||
|
// was configured as 0, start the backoff at 1 ms.
|
||||||
|
currentSleepTimeMs = Math.min(maxSleepTimeMs,
|
||||||
|
(currentSleepTimeMs == 0 ? 1 : currentSleepTimeMs) * 2);
|
||||||
|
} else {
|
||||||
|
currentSleepTimeMs = sleepTimeMs; // reset to initial sleep time
|
||||||
|
}
|
||||||
|
EditLogTailer.this.sleep(currentSleepTimeMs);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("Edit log tailer interrupted: {}", e.getMessage());
|
LOG.warn("Edit log tailer interrupted: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -1662,9 +1662,25 @@
|
|||||||
<name>dfs.ha.tail-edits.period</name>
|
<name>dfs.ha.tail-edits.period</name>
|
||||||
<value>60s</value>
|
<value>60s</value>
|
||||||
<description>
|
<description>
|
||||||
How often, in seconds, the StandbyNode should check for new
|
How often, the StandbyNode and ObserverNode should check if there are new
|
||||||
finalized log segments in the shared edits log.
|
edit log entries ready to be consumed. This is the minimum period between
|
||||||
Support multiple time unit suffix(case insensitive), as described
|
checking; exponential backoff will be applied if no edits are found and
|
||||||
|
dfs.ha.tail-edits.period.backoff-max is configured. By default, no
|
||||||
|
backoff is applied.
|
||||||
|
Supports multiple time unit suffix (case insensitive), as described
|
||||||
|
in dfs.heartbeat.interval.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.ha.tail-edits.period.backoff-max</name>
|
||||||
|
<value>0</value>
|
||||||
|
<description>
|
||||||
|
The maximum time the tailer should wait between checking for new edit log
|
||||||
|
entries. Exponential backoff will be applied when an edit log tail is
|
||||||
|
performed but no edits are available to be read. Values less than or
|
||||||
|
equal to zero disable backoff entirely; this is the default behavior.
|
||||||
|
Supports multiple time unit suffix (case insensitive), as described
|
||||||
in dfs.heartbeat.interval.
|
in dfs.heartbeat.interval.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
@ -140,13 +140,33 @@ few configurations to your **hdfs-site.xml**:
|
|||||||
If too large, RPC time will increase as client requests will wait
|
If too large, RPC time will increase as client requests will wait
|
||||||
longer in the RPC queue before Observer tails edit logs and catches
|
longer in the RPC queue before Observer tails edit logs and catches
|
||||||
up the latest state of Active. The default value is 1min. It is
|
up the latest state of Active. The default value is 1min. It is
|
||||||
**highly recommend** to configure this to a much lower value.
|
**highly recommend** to configure this to a much lower value. It is also
|
||||||
|
recommended to configure backoff to be enabled when using low values; please
|
||||||
|
see below.
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.ha.tail-edits.period</name>
|
<name>dfs.ha.tail-edits.period</name>
|
||||||
<value>0ms</value>
|
<value>0ms</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
* **dfs.ha.tail-edits.period.backoff-max** - whether the Standby/Observer
|
||||||
|
NameNodes should perform backoff when tailing edits.
|
||||||
|
|
||||||
|
This determines the behavior of a Standby/Observer when it attempts to
|
||||||
|
tail edits from the JournalNodes and finds no edits available. This is a
|
||||||
|
common situation when the edit tailing period is very low, but the cluster
|
||||||
|
is not heavily loaded. Without this configuration, such a situation will
|
||||||
|
cause high utilization on the Standby/Observer as it constantly attempts to
|
||||||
|
read edits even though there are none available. With this configuration
|
||||||
|
enabled, exponential backoff will be performed when an edit tail attempt
|
||||||
|
returns 0 edits. This configuration specifies the maximum time to wait
|
||||||
|
between edit tailing attempts.
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.ha.tail-edits.period</name>
|
||||||
|
<value>10s</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
* **dfs.journalnode.edit-cache-size.bytes** - the in-memory cache size,
|
* **dfs.journalnode.edit-cache-size.bytes** - the in-memory cache size,
|
||||||
in bytes, on the JournalNodes.
|
in bytes, on the JournalNodes.
|
||||||
|
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
@ -27,9 +28,14 @@
|
|||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@ -42,8 +48,10 @@
|
|||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
@ -151,6 +159,46 @@ public void testTailer() throws IOException, InterruptedException,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTailerBackoff() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
NameNode.initMetrics(conf, HdfsServerConstants.NamenodeRole.NAMENODE);
|
||||||
|
conf.setTimeDuration(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY,
|
||||||
|
1, TimeUnit.MILLISECONDS);
|
||||||
|
conf.setTimeDuration(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_BACKOFF_MAX_KEY,
|
||||||
|
10, TimeUnit.MILLISECONDS);
|
||||||
|
FSNamesystem mockNamesystem = mock(FSNamesystem.class);
|
||||||
|
FSImage mockImage = mock(FSImage.class);
|
||||||
|
NNStorage mockStorage = mock(NNStorage.class);
|
||||||
|
when(mockNamesystem.getFSImage()).thenReturn(mockImage);
|
||||||
|
when(mockImage.getStorage()).thenReturn(mockStorage);
|
||||||
|
final Queue<Long> sleepDurations = new ConcurrentLinkedQueue<>();
|
||||||
|
final int zeroEditCount = 5;
|
||||||
|
final AtomicInteger tailEditsCallCount = new AtomicInteger(0);
|
||||||
|
EditLogTailer tailer = new EditLogTailer(mockNamesystem, conf) {
|
||||||
|
@Override
|
||||||
|
void sleep(long sleepTimeMs) {
|
||||||
|
if (sleepDurations.size() <= zeroEditCount) {
|
||||||
|
sleepDurations.add(sleepTimeMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long doTailEdits() {
|
||||||
|
return tailEditsCallCount.getAndIncrement() < zeroEditCount ? 0 : 1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tailer.start();
|
||||||
|
try {
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> sleepDurations.size() > zeroEditCount, 50, 10000);
|
||||||
|
} finally {
|
||||||
|
tailer.stop();
|
||||||
|
}
|
||||||
|
List<Long> expectedDurations = Arrays.asList(2L, 4L, 8L, 10L, 10L, 1L);
|
||||||
|
assertEquals(expectedDurations, new ArrayList<>(sleepDurations));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNN0TriggersLogRolls() throws Exception {
|
public void testNN0TriggersLogRolls() throws Exception {
|
||||||
testStandbyTriggersLogRolls(0);
|
testStandbyTriggersLogRolls(0);
|
||||||
|
Loading…
Reference in New Issue
Block a user