YARN-11663. [Federation] Add Cache Entity Nums Limit. (#6662) Contributed by Shilun Fan.
Reviewed-by: Dinesh Chitlangia <dineshc@apache.org> Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
parent
f7d1ec2d9e
commit
5f3eb446f7
@ -4031,6 +4031,10 @@ public static boolean isAclEnabled(Configuration conf) {
|
||||
// 5 minutes
|
||||
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
|
||||
|
||||
public static final String FEDERATION_CACHE_ENTITY_NUMS =
|
||||
FEDERATION_PREFIX + "cache-entity.nums";
|
||||
public static final int DEFAULT_FEDERATION_CACHE_ENTITY_NUMS = 1000;
|
||||
|
||||
public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR =
|
||||
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
|
||||
public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true;
|
||||
|
@ -3787,6 +3787,15 @@
|
||||
<value>300</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
The number of entries in the Federation cache.
|
||||
default is 1000.
|
||||
</description>
|
||||
<name>yarn.federation.cache-entity.nums</name>
|
||||
<value>1000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The registry base directory for federation.</description>
|
||||
<name>yarn.federation.registry.base-dir</name>
|
||||
|
@ -27,15 +27,20 @@
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class FederationGuavaCache extends FederationCache {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FederationCache.class);
|
||||
|
||||
private Cache<String, CacheRequest<String, ?>> cache;
|
||||
|
||||
private int cacheTimeToLive;
|
||||
private long cacheEntityNums;
|
||||
|
||||
private String className = this.getClass().getSimpleName();
|
||||
|
||||
@ -52,6 +57,8 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) {
|
||||
// no conflict or pick up a specific one in the future.
|
||||
cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
|
||||
cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
|
||||
if (cacheTimeToLive <= 0) {
|
||||
isCachingEnabled = false;
|
||||
return;
|
||||
@ -59,8 +66,11 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) {
|
||||
this.setStateStore(pStateStore);
|
||||
|
||||
// Initialize Cache.
|
||||
LOG.info("Creating a JCache Manager with name {}. " +
|
||||
"Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive,
|
||||
cacheEntityNums);
|
||||
cache = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeToLive,
|
||||
TimeUnit.MILLISECONDS).build();
|
||||
TimeUnit.SECONDS).maximumSize(cacheEntityNums).build();
|
||||
isCachingEnabled = true;
|
||||
}
|
||||
|
||||
|
@ -26,32 +26,31 @@
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
import org.ehcache.Cache;
|
||||
import org.ehcache.CacheManager;
|
||||
import org.ehcache.config.builders.CacheConfigurationBuilder;
|
||||
import org.ehcache.config.builders.CacheManagerBuilder;
|
||||
import org.ehcache.config.builders.ExpiryPolicyBuilder;
|
||||
import org.ehcache.config.builders.ResourcePoolsBuilder;
|
||||
import org.ehcache.expiry.ExpiryPolicy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.cache.Cache;
|
||||
import javax.cache.CacheManager;
|
||||
import javax.cache.Caching;
|
||||
import javax.cache.configuration.FactoryBuilder;
|
||||
import javax.cache.configuration.MutableConfiguration;
|
||||
import javax.cache.expiry.CreatedExpiryPolicy;
|
||||
import javax.cache.expiry.Duration;
|
||||
import javax.cache.expiry.ExpiryPolicy;
|
||||
import javax.cache.spi.CachingProvider;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class FederationJCache extends FederationCache {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class);
|
||||
|
||||
private Cache<String, CacheRequest<String, ?>> cache;
|
||||
private Cache<String, CacheRequest> cache;
|
||||
|
||||
private int cacheTimeToLive;
|
||||
private long cacheEntityNums;
|
||||
|
||||
private boolean isCachingEnabled = false;
|
||||
|
||||
private String className = this.getClass().getSimpleName();
|
||||
private final String className = this.getClass().getSimpleName();
|
||||
|
||||
@Override
|
||||
public boolean isCachingEnabled() {
|
||||
@ -64,33 +63,35 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) {
|
||||
// no conflict or pick up a specific one in the future
|
||||
cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
|
||||
cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS,
|
||||
YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS);
|
||||
if (cacheTimeToLive <= 0) {
|
||||
isCachingEnabled = false;
|
||||
return;
|
||||
}
|
||||
this.setStateStore(pStateStore);
|
||||
CachingProvider jcacheProvider = Caching.getCachingProvider();
|
||||
CacheManager jcacheManager = jcacheProvider.getCacheManager();
|
||||
this.cache = jcacheManager.getCache(className);
|
||||
CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true);
|
||||
|
||||
if (this.cache == null) {
|
||||
LOG.info("Creating a JCache Manager with name {}.", className);
|
||||
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
|
||||
FactoryBuilder.SingletonFactory<ExpiryPolicy> expiryPolicySingletonFactory =
|
||||
new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry));
|
||||
MutableConfiguration<String, CacheRequest<String, ?>> configuration =
|
||||
new MutableConfiguration<>();
|
||||
configuration.setStoreByValue(false);
|
||||
configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory);
|
||||
this.cache = jcacheManager.createCache(className, configuration);
|
||||
LOG.info("Creating a JCache Manager with name {}. " +
|
||||
"Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive,
|
||||
cacheEntityNums);
|
||||
// Set the number of caches
|
||||
ResourcePoolsBuilder poolsBuilder = ResourcePoolsBuilder.heap(cacheEntityNums);
|
||||
ExpiryPolicy expiryPolicy = ExpiryPolicyBuilder.timeToLiveExpiration(
|
||||
Duration.ofSeconds(cacheTimeToLive));
|
||||
CacheConfigurationBuilder<String, CacheRequest> configurationBuilder =
|
||||
CacheConfigurationBuilder.newCacheConfigurationBuilder(
|
||||
String.class, CacheRequest.class, poolsBuilder)
|
||||
.withExpiry(expiryPolicy);
|
||||
cache = cacheManager.createCache(className, configurationBuilder);
|
||||
}
|
||||
isCachingEnabled = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearCache() {
|
||||
CachingProvider jcacheProvider = Caching.getCachingProvider();
|
||||
CacheManager jcacheManager = jcacheProvider.getCacheManager();
|
||||
jcacheManager.destroyCache(className);
|
||||
|
||||
this.cache = null;
|
||||
}
|
||||
|
||||
@ -142,13 +143,12 @@ public void removeSubCluster(boolean flushCache) {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public Cache<String, CacheRequest<String, ?>> getCache() {
|
||||
public Cache<String, CacheRequest> getCache() {
|
||||
return cache;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getAppHomeSubClusterCacheKey(ApplicationId appId)
|
||||
throws YarnException {
|
||||
public String getAppHomeSubClusterCacheKey(ApplicationId appId) {
|
||||
return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID,
|
||||
appId.toString());
|
||||
}
|
||||
|
@ -57,7 +57,7 @@
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import javax.cache.Cache;
|
||||
import org.ehcache.Cache;
|
||||
|
||||
/**
|
||||
* Unit tests for FederationStateStoreFacade.
|
||||
@ -245,7 +245,7 @@ public void testGetApplicationHomeSubClusterCache() throws Exception {
|
||||
assert fedCache instanceof FederationJCache;
|
||||
FederationJCache jCache = (FederationJCache) fedCache;
|
||||
String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId);
|
||||
Cache<String, CacheRequest<String, ?>> cache = jCache.getCache();
|
||||
Cache<String, CacheRequest> cache = jCache.getCache();
|
||||
CacheRequest<String, ?> cacheRequest = cache.get(cacheKey);
|
||||
ApplicationHomeSubClusterCacheResponse response =
|
||||
ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());
|
||||
|
Loading…
Reference in New Issue
Block a user