diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0ab4107c13..650e82d673 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4031,6 +4031,10 @@ public static boolean isAclEnabled(Configuration conf) { // 5 minutes public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + public static final String FEDERATION_CACHE_ENTITY_NUMS = + FEDERATION_PREFIX + "cache-entity.nums"; + public static final int DEFAULT_FEDERATION_CACHE_ENTITY_NUMS = 1000; + public static final String FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = FEDERATION_PREFIX + "flush-cache-for-rm-addr"; public static final boolean DEFAULT_FEDERATION_FLUSH_CACHE_FOR_RM_ADDR = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 41e38f601c..6b2d2cd817 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3787,6 +3787,15 @@ 300 + + + The number of entries in the Federation cache. + default is 1000. + + yarn.federation.cache-entity.nums + 1000 + + The registry base directory for federation. yarn.federation.registry.base-dir diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java index 5ab0ef7721..2ba9e2869f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationGuavaCache.java @@ -27,15 +27,20 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.TimeUnit; public class FederationGuavaCache extends FederationCache { + private static final Logger LOG = LoggerFactory.getLogger(FederationCache.class); + private Cache> cache; private int cacheTimeToLive; + private long cacheEntityNums; private String className = this.getClass().getSimpleName(); @@ -52,6 +57,8 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) { // no conflict or pick up a specific one in the future. cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS); if (cacheTimeToLive <= 0) { isCachingEnabled = false; return; @@ -59,8 +66,11 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) { this.setStateStore(pStateStore); // Initialize Cache. + LOG.info("Creating a JCache Manager with name {}. " + + "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive, + cacheEntityNums); cache = CacheBuilder.newBuilder().expireAfterWrite(cacheTimeToLive, - TimeUnit.MILLISECONDS).build(); + TimeUnit.SECONDS).maximumSize(cacheEntityNums).build(); isCachingEnabled = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java index 4b530149b4..b4dbefe127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/cache/FederationJCache.java @@ -26,32 +26,31 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.ehcache.Cache; +import org.ehcache.CacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.ExpiryPolicyBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.expiry.ExpiryPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.cache.Cache; -import javax.cache.CacheManager; -import javax.cache.Caching; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.MutableConfiguration; -import javax.cache.expiry.CreatedExpiryPolicy; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.spi.CachingProvider; +import java.time.Duration; import java.util.Map; -import java.util.concurrent.TimeUnit; public class FederationJCache extends FederationCache { private static final Logger LOG = LoggerFactory.getLogger(FederationJCache.class); - private Cache> cache; + private Cache cache; private int cacheTimeToLive; + private long cacheEntityNums; private boolean isCachingEnabled = false; - private String className = this.getClass().getSimpleName(); + private final String className = this.getClass().getSimpleName(); @Override public boolean isCachingEnabled() { @@ -64,33 +63,35 @@ public void initCache(Configuration pConf, FederationStateStore pStateStore) { // no conflict or pick up a specific one in the future cacheTimeToLive = pConf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS); + cacheEntityNums = pConf.getLong(YarnConfiguration.FEDERATION_CACHE_ENTITY_NUMS, + YarnConfiguration.DEFAULT_FEDERATION_CACHE_ENTITY_NUMS); if (cacheTimeToLive <= 0) { isCachingEnabled = false; return; } this.setStateStore(pStateStore); - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - this.cache = jcacheManager.getCache(className); + CacheManager cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true); + if (this.cache == null) { - LOG.info("Creating a JCache Manager with name {}.", className); - Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive); - FactoryBuilder.SingletonFactory expiryPolicySingletonFactory = - new FactoryBuilder.SingletonFactory<>(new CreatedExpiryPolicy(cacheExpiry)); - MutableConfiguration> configuration = - new MutableConfiguration<>(); - configuration.setStoreByValue(false); - configuration.setExpiryPolicyFactory(expiryPolicySingletonFactory); - this.cache = jcacheManager.createCache(className, configuration); + LOG.info("Creating a JCache Manager with name {}. " + + "Cache TTL Time = {} secs. Cache Entity Nums = {}.", className, cacheTimeToLive, + cacheEntityNums); + // Set the number of caches + ResourcePoolsBuilder poolsBuilder = ResourcePoolsBuilder.heap(cacheEntityNums); + ExpiryPolicy expiryPolicy = ExpiryPolicyBuilder.timeToLiveExpiration( + Duration.ofSeconds(cacheTimeToLive)); + CacheConfigurationBuilder configurationBuilder = + CacheConfigurationBuilder.newCacheConfigurationBuilder( + String.class, CacheRequest.class, poolsBuilder) + .withExpiry(expiryPolicy); + cache = cacheManager.createCache(className, configurationBuilder); } isCachingEnabled = true; } @Override public void clearCache() { - CachingProvider jcacheProvider = Caching.getCachingProvider(); - CacheManager jcacheManager = jcacheProvider.getCacheManager(); - jcacheManager.destroyCache(className); + this.cache = null; } @@ -142,13 +143,12 @@ public void removeSubCluster(boolean flushCache) { } @VisibleForTesting - public Cache> getCache() { + public Cache getCache() { return cache; } @VisibleForTesting - public String getAppHomeSubClusterCacheKey(ApplicationId appId) - throws YarnException { + public String getAppHomeSubClusterCacheKey(ApplicationId appId) { return buildCacheKey(className, GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, appId.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java index e2192e8ae7..25f47ab7d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacade.java @@ -57,7 +57,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import javax.cache.Cache; +import org.ehcache.Cache; /** * Unit tests for FederationStateStoreFacade. @@ -245,7 +245,7 @@ public void testGetApplicationHomeSubClusterCache() throws Exception { assert fedCache instanceof FederationJCache; FederationJCache jCache = (FederationJCache) fedCache; String cacheKey = jCache.getAppHomeSubClusterCacheKey(appId); - Cache> cache = jCache.getCache(); + Cache cache = jCache.getCache(); CacheRequest cacheRequest = cache.get(cacheKey); ApplicationHomeSubClusterCacheResponse response = ApplicationHomeSubClusterCacheResponse.class.cast(cacheRequest.getValue());