HDFS-13820. Add an ability to disable CacheReplicationMonitor. Contributed by Hrishikesh Gadre.
Signed-off-by: Xiao Chen <xiao@apache.org>
This commit is contained in:
parent
16333782c1
commit
335a8139f5
@ -390,6 +390,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
public static final String DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS =
|
||||
"dfs.namenode.path.based.cache.refresh.interval.ms";
|
||||
public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
|
||||
public static final String DFS_NAMENODE_CACHING_ENABLED_KEY =
|
||||
"dfs.namenode.caching.enabled";
|
||||
public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = true;
|
||||
|
||||
/** Pending period of block deletion since NameNode startup */
|
||||
public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY = "dfs.namenode.startup.delay.block.deletion.sec";
|
||||
|
@ -25,6 +25,8 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutputStream;
|
||||
@ -171,6 +173,21 @@ public class CacheManager {
|
||||
|
||||
private final SerializerCompat serializerCompat = new SerializerCompat();
|
||||
|
||||
/**
|
||||
* Whether caching is enabled.
|
||||
*
|
||||
* If caching is disabled, we will not process cache reports or store
|
||||
* information about what is cached where. We also do not start the
|
||||
* CacheReplicationMonitor thread. This will save resources, but provide
|
||||
* less functionality.
|
||||
*
|
||||
* Even when caching is disabled, we still store path-based cache
|
||||
* information. This information is stored in the edit log and fsimage. We
|
||||
* don't want to lose it just because a configuration setting was turned off.
|
||||
* However, we will not act on this information if caching is disabled.
|
||||
*/
|
||||
private final boolean enabled;
|
||||
|
||||
/**
|
||||
* The CacheReplicationMonitor.
|
||||
*/
|
||||
@ -194,6 +211,8 @@ public PersistState(CacheManagerSection section,
|
||||
this.namesystem = namesystem;
|
||||
this.blockManager = blockManager;
|
||||
this.nextDirectiveId = 1;
|
||||
this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
|
||||
DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
|
||||
this.maxListCachePoolsResponses = conf.getInt(
|
||||
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
|
||||
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
|
||||
@ -211,10 +230,13 @@ public PersistState(CacheManagerSection section,
|
||||
DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
|
||||
cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
|
||||
}
|
||||
this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
|
||||
this.cachedBlocks = enabled ? new LightWeightGSet<CachedBlock, CachedBlock>(
|
||||
LightWeightGSet.computeCapacity(cachedBlocksPercent,
|
||||
"cachedBlocks"));
|
||||
"cachedBlocks")) : new LightWeightGSet<>(0);
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -229,6 +251,12 @@ void clear() {
|
||||
}
|
||||
|
||||
public void startMonitorThread() {
|
||||
if (!isEnabled()) {
|
||||
LOG.info("Not starting CacheReplicationMonitor as name-node caching" +
|
||||
" is disabled.");
|
||||
return;
|
||||
}
|
||||
|
||||
crmLock.lock();
|
||||
try {
|
||||
if (this.monitor == null) {
|
||||
@ -242,6 +270,10 @@ public void startMonitorThread() {
|
||||
}
|
||||
|
||||
public void stopMonitorThread() {
|
||||
if (!isEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
crmLock.lock();
|
||||
try {
|
||||
if (this.monitor != null) {
|
||||
@ -945,6 +977,12 @@ private void setCachedLocations(LocatedBlock block) {
|
||||
|
||||
public final void processCacheReport(final DatanodeID datanodeID,
|
||||
final List<Long> blockIds) throws IOException {
|
||||
if (!enabled) {
|
||||
LOG.debug("Ignoring cache report from {} because {} = false. " +
|
||||
"number of blocks: {}", datanodeID,
|
||||
DFS_NAMENODE_CACHING_ENABLED_KEY, blockIds.size());
|
||||
return;
|
||||
}
|
||||
namesystem.writeLock();
|
||||
final long startTime = Time.monotonicNow();
|
||||
final long endTime;
|
||||
|
@ -2468,6 +2468,17 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.caching.enabled</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Set to true to enable block caching. This flag enables the NameNode to
|
||||
maintain a mapping of cached blocks to DataNodes via processing DataNode
|
||||
cache reports. Based on these reports and addition and removal of caching
|
||||
directives, the NameNode will schedule caching and uncaching work.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.path.based.cache.block.map.allocation.percent</name>
|
||||
<value>0.25</value>
|
||||
|
@ -238,6 +238,11 @@ The following properties are not required, but may be specified for tuning:
|
||||
|
||||
The percentage of the Java heap which we will allocate to the cached blocks map. The cached blocks map is a hash map which uses chained hashing. Smaller maps may be accessed more slowly if the number of cached blocks is large; larger maps will consume more memory. The default is 0.25 percent.
|
||||
|
||||
* dfs.namenode.caching.enabled
|
||||
|
||||
This parameter can be used to enable/disable the centralized caching in NameNode. When centralized caching is disabled, NameNode will not process cache reports or store information about block cache locations on the cluster. Note that NameNode will continute to store the path based cache locations in the file-system metadata, even though it will not act on this information until the caching is enabled. The default value for this parameter is true (i.e. centralized caching is enabled).
|
||||
|
||||
|
||||
### OS Limits
|
||||
|
||||
If you get the error "Cannot start datanode because the configured max locked memory size... is more than the datanode's available RLIMIT\_MEMLOCK ulimit," that means that the operating system is imposing a lower limit on the amount of memory that you can lock than what you have configured. To fix this, you must adjust the ulimit -l value that the DataNode runs with. Usually, this value is configured in `/etc/security/limits.conf`. However, it will vary depending on what operating system and distribution you are using.
|
||||
|
@ -22,6 +22,7 @@
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -1556,4 +1557,52 @@ public void testNoLookupsWhenNotUsed() throws Exception {
|
||||
cm.setCachedLocations(locations);
|
||||
Mockito.verifyZeroInteractions(locations);
|
||||
}
|
||||
|
||||
@Test(timeout=120000)
|
||||
public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
|
||||
throws Exception {
|
||||
cluster.shutdown();
|
||||
HdfsConfiguration config = createCachingConf();
|
||||
config.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
|
||||
cluster = new MiniDFSCluster.Builder(config)
|
||||
.numDataNodes(NUM_DATANODES).build();
|
||||
|
||||
cluster.waitActive();
|
||||
dfs = cluster.getFileSystem();
|
||||
namenode = cluster.getNameNode();
|
||||
CacheManager cacheManager = namenode.getNamesystem().getCacheManager();
|
||||
assertFalse(cacheManager.isEnabled());
|
||||
assertNull(cacheManager.getCacheReplicationMonitor());
|
||||
// Create the pool
|
||||
String pool = "pool1";
|
||||
namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
|
||||
// Create some test files
|
||||
final int numFiles = 2;
|
||||
final int numBlocksPerFile = 2;
|
||||
final List<String> paths = new ArrayList<String>(numFiles);
|
||||
for (int i=0; i<numFiles; i++) {
|
||||
Path p = new Path("/testCachePaths-" + i);
|
||||
FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
|
||||
(int)BLOCK_SIZE);
|
||||
paths.add(p.toUri().getPath());
|
||||
}
|
||||
// Check the initial statistics at the namenode
|
||||
waitForCachedBlocks(namenode, 0, 0,
|
||||
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
|
||||
// Cache and check each path in sequence
|
||||
int expected = 0;
|
||||
for (int i=0; i<numFiles; i++) {
|
||||
CacheDirectiveInfo directive =
|
||||
new CacheDirectiveInfo.Builder().
|
||||
setPath(new Path(paths.get(i))).
|
||||
setPool(pool).
|
||||
build();
|
||||
dfs.addCacheDirective(directive);
|
||||
waitForCachedBlocks(namenode, expected, 0,
|
||||
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
|
||||
}
|
||||
Thread.sleep(20000);
|
||||
waitForCachedBlocks(namenode, expected, 0,
|
||||
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user