Revert "YARN-8010. Add config in FederationRMFailoverProxy to not bypass facade cache when failing over. (Botong Huang via Subru)."

This reverts commit 2a2ef15caf as smart-apply-patch script didn't pick the latest patch.
This commit is contained in:
Subru Krishnan 2018-03-28 11:26:50 -07:00
parent 47f711eebc
commit 725b10e3ae
3 changed files with 6 additions and 23 deletions

View File

@ -3089,18 +3089,15 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS = public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs"; FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_FLUSh_CACHE_FOR_RM_ADDR =
FEDERATION_PREFIX + "flush-cache-for-rm-addr";
public static final boolean DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR = true;
public static final String FEDERATION_REGISTRY_BASE_KEY = public static final String FEDERATION_REGISTRY_BASE_KEY =
FEDERATION_PREFIX + "registry.base-dir"; FEDERATION_PREFIX + "registry.base-dir";
public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY = public static final String DEFAULT_FEDERATION_REGISTRY_BASE_KEY =
"yarnfederation/"; "yarnfederation/";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS = public static final String FEDERATION_STATESTORE_HEARTBEAT_INTERVAL_SECS =
FEDERATION_PREFIX + "state-store.heartbeat-interval-secs"; FEDERATION_PREFIX + "state-store.heartbeat-interval-secs";

View File

@ -2924,15 +2924,6 @@
<value>300</value> <value>300</value>
</property> </property>
<property>
<description>
Whether to flush FederationStateStoreFacade cache to get subcluster info
when FederationRMFailoverProxyProvider is performing failover.
</description>
<name>yarn.federation.flush-cache-for-rm-addr</name>
<value>true</value>
</property>
<property> <property>
<description>The registry base directory for federation.</description> <description>The registry base directory for federation.</description>
<name>yarn.federation.registry.base-dir</name> <name>yarn.federation.registry.base-dir</name>

View File

@ -64,8 +64,7 @@ public class FederationRMFailoverProxyProvider<T>
private FederationStateStoreFacade facade; private FederationStateStoreFacade facade;
private SubClusterId subClusterId; private SubClusterId subClusterId;
private UserGroupInformation originalUser; private UserGroupInformation originalUser;
private boolean federationFailoverEnabled; private boolean federationFailoverEnabled = false;
private boolean flushFacadeCacheForYarnRMAddr;
@Override @Override
public void init(Configuration configuration, RMProxy<T> proxy, public void init(Configuration configuration, RMProxy<T> proxy,
@ -76,16 +75,13 @@ public void init(Configuration configuration, RMProxy<T> proxy,
String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID); String clusterId = configuration.get(YarnConfiguration.RM_CLUSTER_ID);
Preconditions.checkNotNull(clusterId, "Missing RM ClusterId"); Preconditions.checkNotNull(clusterId, "Missing RM ClusterId");
this.subClusterId = SubClusterId.newInstance(clusterId); this.subClusterId = SubClusterId.newInstance(clusterId);
this.facade = FederationStateStoreFacade.getInstance(); this.facade = facade.getInstance();
if (configuration instanceof YarnConfiguration) { if (configuration instanceof YarnConfiguration) {
this.conf = (YarnConfiguration) configuration; this.conf = (YarnConfiguration) configuration;
} }
federationFailoverEnabled = federationFailoverEnabled =
conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED); YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
flushFacadeCacheForYarnRMAddr =
conf.getBoolean(YarnConfiguration.FEDERATION_FLUSh_CACHE_FOR_RM_ADDR,
YarnConfiguration.DEFAULT_FEDERATION_FLUSh_CACHE_FOR_RM_ADDR);
conf.setInt( conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
@ -123,8 +119,7 @@ private T getProxyInternal(boolean isFailover) {
try { try {
LOG.info("Failing over to the ResourceManager for SubClusterId: {}", LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
subClusterId); subClusterId);
subClusterInfo = facade.getSubCluster(subClusterId, subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
this.flushFacadeCacheForYarnRMAddr && isFailover);
// updating the conf with the refreshed RM addresses as proxy // updating the conf with the refreshed RM addresses as proxy
// creations are based out of conf // creations are based out of conf
updateRMAddress(subClusterInfo); updateRMAddress(subClusterInfo);