YARN-6247. Share a single instance of SubClusterResolver instead of instantiating one per AM. (Botong Huang via Subru)

(cherry picked from commit 51aeb2ce0c599176aca9466a939c3ad55df30036)
This commit is contained in:
Subru Krishnan 2017-03-02 18:54:53 -08:00 committed by Carlo Curino
parent 8623644f45
commit 86b2bec56e
5 changed files with 59 additions and 12 deletions

View File

@ -2594,6 +2594,12 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String FEDERATION_MACHINE_LIST = public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list"; FEDERATION_PREFIX + "machine-list";
public static final String FEDERATION_CLUSTER_RESOLVER_CLASS =
FEDERATION_PREFIX + "subcluster-resolver.class";
public static final String DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS =
"org.apache.hadoop.yarn.server.federation.resolver."
+ "DefaultSubClusterResolverImpl";
public static final String DEFAULT_FEDERATION_POLICY_KEY = "*"; public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX

View File

@ -2701,6 +2701,13 @@
</description> </description>
<name>yarn.federation.machine-list</name> <name>yarn.federation.machine-list</name>
</property> </property>
<property>
<description>
Class name for SubClusterResolver
</description>
<name>yarn.federation.subcluster-resolver.class</name>
<value>org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl</value>
</property>
<property> <property>
<description> <description>

View File

@ -21,8 +21,8 @@
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map; import java.util.Map;
/** /**
@ -31,9 +31,9 @@
*/ */
public abstract class AbstractSubClusterResolver implements SubClusterResolver { public abstract class AbstractSubClusterResolver implements SubClusterResolver {
private Map<String, SubClusterId> nodeToSubCluster = private Map<String, SubClusterId> nodeToSubCluster =
new HashMap<String, SubClusterId>(); new ConcurrentHashMap<String, SubClusterId>();
private Map<String, Set<SubClusterId>> rackToSubClusters = private Map<String, Set<SubClusterId>> rackToSubClusters =
new HashMap<String, Set<SubClusterId>>(); new ConcurrentHashMap<String, Set<SubClusterId>>();
@Override @Override
public SubClusterId getSubClusterForNode(String nodename) public SubClusterId getSubClusterForNode(String nodename)

View File

@ -25,8 +25,8 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/** /**
* An utility that helps to determine the sub-cluster that a specified node * An utility that helps to determine the sub-cluster that a specified node or
* belongs to. * rack belongs to. All implementing classes should be thread-safe.
*/ */
public interface SubClusterResolver extends Configurable { public interface SubClusterResolver extends Configurable {

View File

@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@ -90,6 +91,7 @@ public final class FederationStateStoreFacade {
private int cacheTimeToLive; private int cacheTimeToLive;
private Configuration conf; private Configuration conf;
private Cache<Object, Object> cache; private Cache<Object, Object> cache;
private SubClusterResolver subclusterResolver;
private FederationStateStoreFacade() { private FederationStateStoreFacade() {
initializeFacadeInternal(new Configuration()); initializeFacadeInternal(new Configuration());
@ -104,6 +106,12 @@ private void initializeFacadeInternal(Configuration config) {
FederationStateStore.class, createRetryPolicy(conf)); FederationStateStore.class, createRetryPolicy(conf));
this.stateStore.init(conf); this.stateStore.init(conf);
this.subclusterResolver = createInstance(conf,
YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS,
SubClusterResolver.class);
this.subclusterResolver.load();
initCache(); initCache();
} catch (YarnException ex) { } catch (YarnException ex) {
@ -347,6 +355,15 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
return response.getApplicationHomeSubCluster().getHomeSubCluster(); return response.getApplicationHomeSubCluster().getHomeSubCluster();
} }
/**
* Get the singleton instance of SubClusterResolver.
*
* @return SubClusterResolver instance
*/
public SubClusterResolver getSubClusterResolver() {
return this.subclusterResolver;
}
/** /**
* Helper method to create instances of Object using the class name defined in * Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using * the configuration object. The instances creates {@link RetryProxy} using
@ -359,23 +376,40 @@ public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
* @param retryPolicy the policy for retrying method call failures * @param retryPolicy the policy for retrying method call failures
* @return a retry proxy for the specified interface * @return a retry proxy for the specified interface
*/ */
@SuppressWarnings("unchecked")
public static <T> Object createRetryInstance(Configuration conf, public static <T> Object createRetryInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type, String configuredClassName, String defaultValue, Class<T> type,
RetryPolicy retryPolicy) { RetryPolicy retryPolicy) {
return RetryProxy.create(type,
createInstance(conf, configuredClassName, defaultValue, type),
retryPolicy);
}
/**
* Helper method to create instances of Object using the class name specified
* in the configuration object.
*
* @param conf the yarn configuration
* @param configuredClassName the configuration provider key
* @param defaultValue the default implementation class
* @param type the required interface/base class
* @param <T> The type of the instance to create
* @return the instances created
*/
@SuppressWarnings("unchecked")
public static <T> T createInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type) {
String className = conf.get(configuredClassName, defaultValue); String className = conf.get(configuredClassName, defaultValue);
try { try {
Class<?> clusterResolverClass = conf.getClassByName(className); Class<?> clusterResolverClass = conf.getClassByName(className);
if (type.isAssignableFrom(clusterResolverClass)) { if (type.isAssignableFrom(clusterResolverClass)) {
return RetryProxy.create(type, return (T) ReflectionUtils.newInstance(clusterResolverClass, conf);
(T) ReflectionUtils.newInstance(clusterResolverClass, conf),
retryPolicy);
} else { } else {
throw new YarnRuntimeException( throw new YarnRuntimeException("Class: " + className
"Class: " + className + " not instance of " + type.getSimpleName()); + " not instance of " + type.getCanonicalName());
} }
} catch (Exception e) { } catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate : " + className, e); throw new YarnRuntimeException("Could not instantiate : " + className, e);
} }
} }