YARN-3672. Create Facade for Federation State and Policy Store. Contributed by Subru Krishnan

(cherry picked from commit 5e976cd2b90ccf1bccb6039edf14140677804c4e)
This commit is contained in:
Jian He 2016-08-17 11:13:19 +08:00 committed by Carlo Curino
parent 52558df620
commit 5c84382397
9 changed files with 905 additions and 1 deletions

View File

@ -98,6 +98,9 @@
<apacheds.version>2.0.0-M21</apacheds.version> <apacheds.version>2.0.0-M21</apacheds.version>
<ldap-api.version>1.0.0-M33</ldap-api.version> <ldap-api.version>1.0.0-M33</ldap-api.version>
<jcache.version>1.0.0</jcache.version>
<ehcache.version>3.0.3</ehcache.version>
<!-- define the Java language version used by the compiler --> <!-- define the Java language version used by the compiler -->
<javac.version>1.8</javac.version> <javac.version>1.8</javac.version>
@ -1265,6 +1268,16 @@
<artifactId>kerb-simplekdc</artifactId> <artifactId>kerb-simplekdc</artifactId>
<version>1.0.0</version> <version>1.0.0</version>
</dependency> </dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -2560,6 +2560,19 @@ public static boolean isAclEnabled(Configuration conf) {
//////////////////////////////// ////////////////////////////////
public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation."; public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
FEDERATION_PREFIX + "state-store.class";
public static final String DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS =
"org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore";
public static final String FEDERATION_CACHE_TIME_TO_LIVE_SECS =
FEDERATION_PREFIX + "cache-ttl.secs";
// 5 minutes
public static final int DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS = 5 * 60;
public static final String FEDERATION_MACHINE_LIST = public static final String FEDERATION_MACHINE_LIST =
FEDERATION_PREFIX + "machine-list"; FEDERATION_PREFIX + "machine-list";

View File

@ -68,6 +68,10 @@ public void initializeMemberVariables() {
.YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL); .YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL);
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Federation default configs to be ignored
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
// Ignore blacklisting nodes for AM failures feature since it is still a // Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress" // "work in progress"
configurationPropsToSkipCompare.add(YarnConfiguration. configurationPropsToSkipCompare.add(YarnConfiguration.

View File

@ -2686,8 +2686,8 @@
<description>The arguments to pass to the Node label script.</description> <description>The arguments to pass to the Node label script.</description>
<name>yarn.nodemanager.node-labels.provider.script.opts</name> <name>yarn.nodemanager.node-labels.provider.script.opts</name>
</property> </property>
<!-- Other Configuration -->
<!-- Federation Configuration -->
<property> <property>
<description> <description>
Machine list file to be loaded by the FederationSubCluster Resolver Machine list file to be loaded by the FederationSubCluster Resolver
@ -2695,6 +2695,24 @@
<name>yarn.federation.machine-list</name> <name>yarn.federation.machine-list</name>
</property> </property>
<property>
<description>
Store class name for federation state store
</description>
<name>yarn.federation.state-store.class</name>
<value>org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore</value>
</property>
<property>
<description>
The time in seconds after which the federation state store local cache
will be refreshed periodically
</description>
<name>yarn.federation.cache-ttl.secs</name>
<value>300</value>
</property>
<!-- Other Configuration -->
<property> <property>
<description>The interval that the yarn client library uses to poll the <description>The interval that the yarn client library uses to poll the
completion status of the asynchronous API of application client protocol. completion status of the asynchronous API of application client protocol.

View File

@ -102,6 +102,16 @@
<groupId>org.fusesource.leveldbjni</groupId> <groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId> <artifactId>leveldbjni-all</artifactId>
</dependency> </dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>${ehcache.version}</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,532 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
*
* The FederationStateStoreFacade is an utility wrapper that provides singleton
* access to the Federation state store. It abstracts out retries and in
* addition, it also implements the caching for various objects.
*
*/
public final class FederationStateStoreFacade {
private static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreFacade.class);
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
private Cache<Object, Object> cache;
private FederationStateStoreFacade() {
initializeFacadeInternal(new Configuration());
}
private void initializeFacadeInternal(Configuration config) {
this.conf = config;
try {
this.stateStore = (FederationStateStore) createRetryInstance(this.conf,
YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
FederationStateStore.class, createRetryPolicy(conf));
this.stateStore.init(conf);
initCache();
} catch (YarnException ex) {
LOG.error("Failed to initialize the FederationStateStoreFacade object",
ex);
throw new RuntimeException(ex);
}
}
/**
* Delete and re-initialize the cache, to force it to use the given
* configuration.
*
* @param store the {@link FederationStateStore} instance to reinitialize with
* @param config the updated configuration to reinitialize with
*/
@VisibleForTesting
public synchronized void reinitialize(FederationStateStore store,
Configuration config) {
this.conf = config;
this.stateStore = store;
clearCache();
initCache();
}
public static RetryPolicy createRetryPolicy(Configuration conf) {
// Retry settings for StateStore
RetryPolicy retryPolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
return retryPolicy;
}
private boolean isCachingEnabled() {
return (cacheTimeToLive > 0);
}
private void initCache() {
// Picking the JCache provider from classpath, need to make sure there's
// no conflict or pick up a specific one in the future
cacheTimeToLive =
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
if (isCachingEnabled()) {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
if (this.cache == null) {
LOG.info("Creating a JCache Manager with name "
+ this.getClass().getSimpleName());
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
CompleteConfiguration<Object, Object> configuration =
new MutableConfiguration<Object, Object>().setStoreByValue(false)
.setReadThrough(true)
.setExpiryPolicyFactory(
new FactoryBuilder.SingletonFactory<ExpiryPolicy>(
new CreatedExpiryPolicy(cacheExpiry)))
.setCacheLoaderFactory(
new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>(
new CacheLoaderImpl<Object, Object>()));
this.cache = jcacheManager.createCache(this.getClass().getSimpleName(),
configuration);
}
}
}
private void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(this.getClass().getSimpleName());
this.cache = null;
}
/**
* Returns the singleton instance of the FederationStateStoreFacade object.
*
* @return the singleton {@link FederationStateStoreFacade} instance
*/
public static FederationStateStoreFacade getInstance() {
return FACADE;
}
/**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
throws YarnException {
if (isCachingEnabled()) {
return getSubClusters(false).get(subClusterId);
} else {
return stateStore
.getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId))
.getSubClusterInfo();
}
}
/**
* Updates the cache with the central {@link FederationStateStore} and returns
* the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @param flushCache flag to indicate if the cache should be flushed or not
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store,"
+ " most likely on account of RM failover.");
cache.remove(buildGetSubClustersCacheRequest(false));
}
return getSubCluster(subClusterId);
}
/**
* Returns the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters) throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<SubClusterId, SubClusterInfo>) cache
.get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
} else {
return buildSubClusterInfoMap(stateStore.getSubClusters(
GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters)));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
* @param queue the queue whose policy is required
* @return the corresponding configured policy
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterPolicyConfiguration getPolicyConfiguration(
final String queue) throws YarnException {
if (isCachingEnabled()) {
return getPoliciesConfigurations().get(queue);
} else {
return stateStore
.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue))
.getPolicyConfiguration();
}
}
/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
* the system.
*
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<String, SubClusterPolicyConfiguration>) cache
.get(buildGetPoliciesConfigurationsCacheRequest());
} else {
return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance()));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void addApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
* Updates the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void updateApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
* Returns the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appId the identifier of the application
* @return the home sub cluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
/**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using
* the specific {@link RetryPolicy}.
*
* @param conf the yarn configuration
* @param configuredClassName the configuration provider key
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @return a retry proxy for the specified interface
*/
@SuppressWarnings("unchecked")
public static <T> Object createRetryInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type,
RetryPolicy retryPolicy) {
String className = conf.get(configuredClassName, defaultValue);
try {
Class<?> clusterResolverClass = conf.getClassByName(className);
if (type.isAssignableFrom(clusterResolverClass)) {
return RetryProxy.create(type,
(T) ReflectionUtils.newInstance(clusterResolverClass, conf),
retryPolicy);
} else {
throw new YarnRuntimeException(
"Class: " + className + " not instance of " + type.getSimpleName());
}
} catch (Exception e) {
throw new YarnRuntimeException("Could not instantiate : " + className, e);
}
}
private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
Map<SubClusterId, SubClusterInfo> subClustersMap =
new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
private Object buildGetSubClustersCacheRequest(
final boolean filterInactiveSubClusters) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_SUBCLUSTERS_CACHEID, null);
CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey,
new Func<String, Map<SubClusterId, SubClusterInfo>>() {
@Override
public Map<SubClusterId, SubClusterInfo> invoke(String key)
throws Exception {
GetSubClustersInfoResponse subClusters =
stateStore.getSubClusters(GetSubClustersInfoRequest
.newInstance(filterInactiveSubClusters));
return buildSubClusterInfoMap(subClusters);
}
});
return cacheRequest;
}
private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs =
response.getPoliciesConfigs();
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs =
new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
private Object buildGetPoliciesConfigurationsCacheRequest() {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_POLICIES_CONFIGURATIONS_CACHEID, null);
CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest =
new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>(
cacheKey,
new Func<String, Map<String, SubClusterPolicyConfiguration>>() {
@Override
public Map<String, SubClusterPolicyConfiguration> invoke(
String key) throws Exception {
GetSubClusterPoliciesConfigurationsResponse policyConfigs =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest
.newInstance());
return buildPolicyConfigMap(policyConfigs);
}
});
return cacheRequest;
}
protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(typeName).append(".");
buffer.append(methodName);
if (argName != null) {
buffer.append("::");
buffer.append(argName);
}
return buffer.toString();
}
/**
* Internal class that implements the CacheLoader interface that can be
* plugged into the CacheManager to load objects into the cache for specified
* keys.
*/
private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
@SuppressWarnings("unchecked")
@Override
public V load(K key) throws CacheLoaderException {
try {
CacheRequest<K, V> query = (CacheRequest<K, V>) key;
assert query != null;
return query.getValue();
} catch (Throwable ex) {
throw new CacheLoaderException(ex);
}
}
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys)
throws CacheLoaderException {
// The FACADE does not use the Cache's getAll API. Hence this is not
// required to be implemented
throw new NotImplementedException();
}
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;
public CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
public V getValue() throws Exception {
return func.invoke(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
if (key == null) {
if (other.key != null) {
return false;
}
} else if (!key.equals(other.key)) {
return false;
}
return true;
}
}
/**
* Encapsulates a method that has one parameter and returns a value of the
* type specified by the TResult parameter.
*/
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}
}

View File

@ -0,0 +1,17 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.util.MonotonicClock;
/**
* Utility class for FederationStateStore unit tests.
*/
public class FederationStateStoreTestUtil {
private static final MonotonicClock CLOCK = new MonotonicClock();
public static final String SC_PREFIX = "SC-";
public static final String Q_PREFIX = "queue-";
public static final String POLICY_PREFIX = "policy-";
private FederationStateStore stateStore;
public FederationStateStoreTestUtil(FederationStateStore stateStore) {
this.stateStore = stateStore;
}
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
String clientRMAddress = "1.2.3.4:2";
String rmAdminAddress = "1.2.3.4:3";
String webAppAddress = "1.2.3.4:4";
return SubClusterInfo.newInstance(subClusterId, amRMAddress,
clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
CLOCK.getTime(), "capability");
}
private void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
stateStore.registerSubCluster(
SubClusterRegisterRequest.newInstance(subClusterInfo));
}
public void registerSubClusters(int numSubClusters) throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
registerSubCluster(SubClusterId.newInstance(SC_PREFIX + i));
}
}
private void addApplicationHomeSC(ApplicationId appId,
SubClusterId subClusterId) throws YarnException {
ApplicationHomeSubCluster ahsc =
ApplicationHomeSubCluster.newInstance(appId, subClusterId);
AddApplicationHomeSubClusterRequest request =
AddApplicationHomeSubClusterRequest.newInstance(ahsc);
stateStore.addApplicationHomeSubCluster(request);
}
public void addAppsHomeSC(long clusterTs, int numApps) throws YarnException {
for (int i = 0; i < numApps; i++) {
addApplicationHomeSC(ApplicationId.newInstance(clusterTs, i),
SubClusterId.newInstance(SC_PREFIX + i));
}
}
private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
String policyType) {
return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
ByteBuffer.allocate(1));
}
private void setPolicyConf(String queue, String policyType)
throws YarnException {
SetSubClusterPolicyConfigurationRequest request =
SetSubClusterPolicyConfigurationRequest
.newInstance(createSCPolicyConf(queue, policyType));
stateStore.setPolicyConfiguration(request);
}
public void addPolicyConfigs(int numQueues) throws YarnException {
for (int i = 0; i < numQueues; i++) {
setPolicyConf(Q_PREFIX + i, POLICY_PREFIX + i);
}
}
public SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
return stateStore.getSubCluster(request).getSubClusterInfo();
}
public SubClusterId queryApplicationHomeSC(ApplicationId appId)
throws YarnException {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(appId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
public SubClusterPolicyConfiguration queryPolicyConfiguration(String queue)
throws YarnException {
GetSubClusterPolicyConfigurationRequest request =
GetSubClusterPolicyConfigurationRequest.newInstance(queue);
GetSubClusterPolicyConfigurationResponse result =
stateStore.getPolicyConfiguration(request);
return result.getPolicyConfiguration();
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Unit tests for FederationStateStoreFacade.
*/
@RunWith(Parameterized.class)
public class TestFederationStateStoreFacade {
@Parameters
public static Collection<Boolean[]> getParameters() {
return Arrays
.asList(new Boolean[][] {{Boolean.FALSE }, {Boolean.TRUE } });
}
private final long clusterTs = System.currentTimeMillis();
private final int numSubClusters = 3;
private final int numApps = 5;
private final int numQueues = 2;
private Configuration conf;
private FederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreTestUtil;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
public TestFederationStateStoreFacade(Boolean isCachingEnabled) {
conf = new Configuration();
if (!(isCachingEnabled.booleanValue())) {
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
}
}
@Before
public void setUp() throws IOException, YarnException {
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade.reinitialize(stateStore, conf);
// hydrate the store
stateStoreTestUtil = new FederationStateStoreTestUtil(stateStore);
stateStoreTestUtil.registerSubClusters(numSubClusters);
stateStoreTestUtil.addAppsHomeSC(clusterTs, numApps);
stateStoreTestUtil.addPolicyConfigs(numQueues);
}
@After
public void tearDown() throws Exception {
stateStore.close();
stateStore = null;
}
@Test
public void testGetSubCluster() throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
SubClusterId subClusterId =
SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
facade.getSubCluster(subClusterId));
}
}
@Test
public void testGetSubClusterFlushCache() throws YarnException {
for (int i = 0; i < numSubClusters; i++) {
SubClusterId subClusterId =
SubClusterId.newInstance(FederationStateStoreTestUtil.SC_PREFIX + i);
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
facade.getSubCluster(subClusterId, true));
}
}
@Test
public void testGetSubClusters() throws YarnException {
Map<SubClusterId, SubClusterInfo> subClusters =
facade.getSubClusters(false);
for (SubClusterId subClusterId : subClusters.keySet()) {
Assert.assertEquals(stateStoreTestUtil.querySubClusterInfo(subClusterId),
subClusters.get(subClusterId));
}
}
@Test
public void testGetPolicyConfiguration() throws YarnException {
for (int i = 0; i < numQueues; i++) {
String queue = FederationStateStoreTestUtil.Q_PREFIX + i;
Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
facade.getPolicyConfiguration(queue));
}
}
@Test
public void testGetPoliciesConfigurations() throws YarnException {
Map<String, SubClusterPolicyConfiguration> queuePolicies =
facade.getPoliciesConfigurations();
for (String queue : queuePolicies.keySet()) {
Assert.assertEquals(stateStoreTestUtil.queryPolicyConfiguration(queue),
queuePolicies.get(queue));
}
}
@Test
public void testGetHomeSubClusterForApp() throws YarnException {
for (int i = 0; i < numApps; i++) {
ApplicationId appId = ApplicationId.newInstance(clusterTs, i);
Assert.assertEquals(stateStoreTestUtil.queryApplicationHomeSC(appId),
facade.getApplicationHomeSubCluster(appId));
}
}
}