HDFS-16550. Allow JN edit cache size to be set as a fraction of heap memory (#4209)
This commit is contained in:
parent
7786600744
commit
2067fcb646
@ -1424,7 +1424,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
|
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
|
||||||
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
|
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
|
||||||
"dfs.journalnode.edit-cache-size.bytes";
|
"dfs.journalnode.edit-cache-size.bytes";
|
||||||
public static final int DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT = 1024 * 1024;
|
|
||||||
|
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY =
|
||||||
|
"dfs.journalnode.edit-cache-size.fraction";
|
||||||
|
public static final float DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT = 0.5f;
|
||||||
|
|
||||||
// Journal-node related configs for the client side.
|
// Journal-node related configs for the client side.
|
||||||
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
|
public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
|
||||||
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
import org.apache.hadoop.util.AutoCloseableLock;
|
import org.apache.hadoop.util.AutoCloseableLock;
|
||||||
|
import org.apache.hadoop.util.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An in-memory cache of edits in their serialized form. This is used to serve
|
* An in-memory cache of edits in their serialized form. This is used to serve
|
||||||
@ -121,12 +122,18 @@ class JournaledEditsCache {
|
|||||||
// ** End lock-protected fields **
|
// ** End lock-protected fields **
|
||||||
|
|
||||||
JournaledEditsCache(Configuration conf) {
|
JournaledEditsCache(Configuration conf) {
|
||||||
|
float fraction = conf.getFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_DEFAULT);
|
||||||
|
Preconditions.checkArgument((fraction > 0 && fraction < 1.0f),
|
||||||
|
String.format("Cache config %s is set at %f, it should be a positive float value, " +
|
||||||
|
"less than 1.0. The recommended value is less than 0.9.",
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, fraction));
|
||||||
capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
capacity = conf.getInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_DEFAULT);
|
(int) (Runtime.getRuntime().maxMemory() * fraction));
|
||||||
if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
|
if (capacity > 0.9 * Runtime.getRuntime().maxMemory()) {
|
||||||
Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
|
Journal.LOG.warn(String.format("Cache capacity is set at %d bytes but " +
|
||||||
"maximum JVM memory is only %d bytes. It is recommended that you " +
|
"maximum JVM memory is only %d bytes. It is recommended that you " +
|
||||||
"decrease the cache size or increase the heap size.",
|
"decrease the cache size/fraction or increase the heap size.",
|
||||||
capacity, Runtime.getRuntime().maxMemory()));
|
capacity, Runtime.getRuntime().maxMemory()));
|
||||||
}
|
}
|
||||||
Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
|
Journal.LOG.info("Enabling the journaled edits cache with a capacity " +
|
||||||
@ -277,11 +284,12 @@ class JournaledEditsCache {
|
|||||||
initialize(INVALID_TXN_ID);
|
initialize(INVALID_TXN_ID);
|
||||||
Journal.LOG.warn(String.format("A single batch of edits was too " +
|
Journal.LOG.warn(String.format("A single batch of edits was too " +
|
||||||
"large to fit into the cache: startTxn = %d, endTxn = %d, " +
|
"large to fit into the cache: startTxn = %d, endTxn = %d, " +
|
||||||
"input length = %d. The capacity of the cache (%s) must be " +
|
"input length = %d. The cache size (%s) or cache fraction (%s) must be " +
|
||||||
"increased for it to work properly (current capacity %d)." +
|
"increased for it to work properly (current capacity %d)." +
|
||||||
"Cache is now empty.",
|
"Cache is now empty.",
|
||||||
newStartTxn, newEndTxn, inputData.length,
|
newStartTxn, newEndTxn, inputData.length,
|
||||||
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity));
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, capacity));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (dataMap.isEmpty()) {
|
if (dataMap.isEmpty()) {
|
||||||
@ -388,10 +396,11 @@ class JournaledEditsCache {
|
|||||||
} else {
|
} else {
|
||||||
return new CacheMissException(lowestTxnId - requestedTxnId,
|
return new CacheMissException(lowestTxnId - requestedTxnId,
|
||||||
"Oldest txn ID available in the cache is %d, but requested txns " +
|
"Oldest txn ID available in the cache is %d, but requested txns " +
|
||||||
"starting at %d. The cache size (%s) may need to be increased " +
|
"starting at %d. The cache size (%s) or cache fraction (%s) may need to be " +
|
||||||
"to hold more transactions (currently %d bytes containing %d " +
|
"increased to hold more transactions (currently %d bytes containing %d " +
|
||||||
"transactions)", lowestTxnId, requestedTxnId,
|
"transactions)", lowestTxnId, requestedTxnId,
|
||||||
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, capacity,
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, capacity,
|
||||||
highestTxnId - lowestTxnId + 1);
|
highestTxnId - lowestTxnId + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -414,4 +423,9 @@ class JournaledEditsCache {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
int getCapacity() {
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4945,7 +4945,7 @@
|
|||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.journalnode.edit-cache-size.bytes</name>
|
<name>dfs.journalnode.edit-cache-size.bytes</name>
|
||||||
<value>1048576</value>
|
<value></value>
|
||||||
<description>
|
<description>
|
||||||
The size, in bytes, of the in-memory cache of edits to keep on the
|
The size, in bytes, of the in-memory cache of edits to keep on the
|
||||||
JournalNode. This cache is used to serve edits for tailing via the RPC-based
|
JournalNode. This cache is used to serve edits for tailing via the RPC-based
|
||||||
@ -4955,6 +4955,22 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.journalnode.edit-cache-size.fraction</name>
|
||||||
|
<value>0.5f</value>
|
||||||
|
<description>
|
||||||
|
This ratio refers to the proportion of the maximum memory of the JVM.
|
||||||
|
Used to calculate the size of the edits cache that is kept in the JournalNode's memory.
|
||||||
|
This config is an alternative to the dfs.journalnode.edit-cache-size.bytes.
|
||||||
|
And it is used to serve edits for tailing via the RPC-based mechanism, and is only
|
||||||
|
enabled when dfs.ha.tail-edits.in-progress is true. Transactions range in size but
|
||||||
|
are around 200 bytes on average, so the default of 1MB can store around 5000 transactions.
|
||||||
|
So we can configure a reasonable value based on the maximum memory. The recommended value
|
||||||
|
is less than 0.9. If we set dfs.journalnode.edit-cache-size.bytes, this parameter will
|
||||||
|
not take effect.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
|
<name>dfs.journalnode.kerberos.internal.spnego.principal</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
|
@ -502,6 +502,16 @@ lag time will be much longer. The relevant configurations are:
|
|||||||
the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the
|
the oldest data in the cache was at transaction ID 20, a value of 10 would be added to the
|
||||||
average.
|
average.
|
||||||
|
|
||||||
|
* **dfs.journalnode.edit-cache-size.fraction** - This fraction refers to the proportion of
|
||||||
|
the maximum memory of the JVM. Used to calculate the size of the edits cache that is
|
||||||
|
kept in the JournalNode's memory. This config is an alternative to the
|
||||||
|
dfs.journalnode.edit-cache-size.bytes. And it is used to serve edits for tailing via
|
||||||
|
the RPC-based mechanism, and is only enabled when dfs.ha.tail-edits.in-progress is true.
|
||||||
|
Transactions range in size but are around 200 bytes on average, so the default of 1MB
|
||||||
|
can store around 5000 transactions. So we can configure a reasonable value based on
|
||||||
|
the maximum memory. The recommended value is less than 0.9. If we set
|
||||||
|
dfs.journalnode.edit-cache-size.bytes, this parameter will not take effect.
|
||||||
|
|
||||||
This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this
|
This feature is primarily useful in conjunction with the Standby/Observer Read feature. Using this
|
||||||
feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits
|
feature, read requests can be serviced from non-active NameNodes; thus tailing in-progress edits
|
||||||
provides these nodes with the ability to serve requests with data which is much more fresh. See the
|
provides these nodes with the ability to serve requests with data which is much more fresh. See the
|
||||||
|
@ -194,6 +194,24 @@ few configurations to your **hdfs-site.xml**:
|
|||||||
<value>1048576</value>
|
<value>1048576</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
* **dfs.journalnode.edit-cache-size.fraction** - the fraction refers to
|
||||||
|
the proportion of the maximum memory of the JVM.
|
||||||
|
|
||||||
|
Used to calculate the size of the edits cache that
|
||||||
|
is kept in the JournalNode's memory.
|
||||||
|
This config is an alternative to the dfs.journalnode.edit-cache-size.bytes.
|
||||||
|
And it is used to serve edits for tailing via the RPC-based mechanism, and is only
|
||||||
|
enabled when dfs.ha.tail-edits.in-progress is true. Transactions range in size but
|
||||||
|
are around 200 bytes on average, so the default of 1MB can store around 5000 transactions.
|
||||||
|
So we can configure a reasonable value based on the maximum memory. The recommended value
|
||||||
|
is less than 0.9. If we set dfs.journalnode.edit-cache-size.bytes, this parameter will
|
||||||
|
not take effect.
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.journalnode.edit-cache-size.fraction</name>
|
||||||
|
<value>0.5f</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
* **dfs.namenode.accesstime.precision** -- whether to enable access
|
* **dfs.namenode.accesstime.precision** -- whether to enable access
|
||||||
time for HDFS file.
|
time for HDFS file.
|
||||||
|
|
||||||
|
@ -221,6 +221,27 @@ public class TestJournaledEditsCache {
|
|||||||
cache.retrieveEdits(-1, 10, new ArrayList<>());
|
cache.retrieveEdits(-1, 10, new ArrayList<>());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheSizeConfigs() {
|
||||||
|
// Assert the default configs.
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
cache = new JournaledEditsCache(config);
|
||||||
|
assertEquals((int) (Runtime.getRuntime().maxMemory() * 0.5f), cache.getCapacity());
|
||||||
|
|
||||||
|
// Set dfs.journalnode.edit-cache-size.bytes.
|
||||||
|
Configuration config1 = new Configuration();
|
||||||
|
config1.setInt(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY, 1);
|
||||||
|
config1.setFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, 0.1f);
|
||||||
|
cache = new JournaledEditsCache(config1);
|
||||||
|
assertEquals(1, cache.getCapacity());
|
||||||
|
|
||||||
|
// Don't set dfs.journalnode.edit-cache-size.bytes.
|
||||||
|
Configuration config2 = new Configuration();
|
||||||
|
config2.setFloat(DFSConfigKeys.DFS_JOURNALNODE_EDIT_CACHE_SIZE_FRACTION_KEY, 0.1f);
|
||||||
|
cache = new JournaledEditsCache(config2);
|
||||||
|
assertEquals((int) (Runtime.getRuntime().maxMemory() * 0.1f), cache.getCapacity());
|
||||||
|
}
|
||||||
|
|
||||||
private void storeEdits(int startTxn, int endTxn) throws Exception {
|
private void storeEdits(int startTxn, int endTxn) throws Exception {
|
||||||
cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn,
|
cache.storeEdits(createTxnData(startTxn, endTxn - startTxn + 1), startTxn,
|
||||||
endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
endTxn, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user