diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 04b28985c7..658099a63d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3089,15 +3089,18 @@ public static boolean isAclEnabled(Configuration conf) { public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = FEDERATION_PREFIX + "cache-ttl.secs"; + // 5 minutes + public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; + + public static final String FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = + FEDERATION_PREFIX + "flush-cache-for-rm-addr"; + public static final boolean DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = true; public static final String FEDERATION_REGISTRY_BASE_KEY = FEDERATION_PREFIX + "registry.base-dir"; public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = "yarnfederation/"; - // 5 minutes - public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60; - public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 114ba4bc3f..2eba8df670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2924,6 +2924,15 @@ 300 + + + Whether to flush FederationStateStoreFacade cache to get subcluster info + when FederationRMFailoverProxyProvider is performing failover. + + yarn.federation.flush-cache-for-rm-addr + true + + The registry base directory for federation. yarn.federation.registry.base-dir diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java index c631208b78..b72b199359 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java @@ -64,7 +64,8 @@ public class FederationRMFailoverProxyProvider private FederationStateStoreFacade facade; private SubClusterId subClusterId; private UserGroupInformation originalUser; - private boolean federationFailoverEnabled = false; + private boolean federationFailoverEnabled; + private boolean flushFacadeCacheForYarnRMAddr; @Override public void init(Configuration configuration, RMProxy proxy, @@ -75,13 +76,16 @@ public void init(Configuration configuration, RMProxy proxy, String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); this.subClusterId = SubClusterId.newInstance(clusterId); - this.facade = facade.getInstance(); + this.facade = FederationStateStoreFacade.getInstance(); if (configuration instanceof YarnConfiguration) { this.conf = (YarnConfiguration) configuration; } federationFailoverEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); + flushFacadeCacheForYarnRMAddr = + conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR, + YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR); conf.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, @@ -119,7 +123,8 @@ private T getProxyInternal(boolean isFailover) { try { LOG.info("Failing over to the ResourceManager for SubClusterId: {}", subClusterId); - subClusterInfo = facade.getSubCluster(subClusterId, isFailover); + subClusterInfo = facade.getSubCluster(subClusterId, + this.flushFacadeCacheForYarnRMAddr && isFailover); // updating the conf with the refreshed RM addresses as proxy // creations are based out of conf updateRMAddress(subClusterInfo);