YARN-7707. BackPort [GPG] Policy generator framework. (#5810)

This commit is contained in:
slfan1989 2023-07-28 13:41:27 +08:00 committed by GitHub
parent 6dafd53626
commit 6d32a06125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1797 additions and 2 deletions

View File

@ -4390,6 +4390,43 @@ public class YarnConfiguration extends Configuration {
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";
public static final String FEDERATION_GPG_POLICY_PREFIX =
FEDERATION_GPG_PREFIX + "policy.generator.";
/** The interval at which the policy generator runs, default is one hour. */
public static final String GPG_POLICY_GENERATOR_INTERVAL =
FEDERATION_GPG_POLICY_PREFIX + "interval";
public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL = TimeUnit.HOURS.toMillis(1);
/** The interval at which the policy generator runs, default is one hour.
* This is an deprecated property, We better set it
* `yarn.federation.gpg.policy.generator.interval`. */
public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
/**
* The configured policy generator class, runs NoOpGlobalPolicy by
* default.
*/
public static final String GPG_GLOBAL_POLICY_CLASS = FEDERATION_GPG_POLICY_PREFIX + "class";
public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
"org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." +
"NoOpGlobalPolicy";
/**
* Whether or not the policy generator is running in read only (won't modify
* policies), default is false.
*/
public static final String GPG_POLICY_GENERATOR_READONLY =
FEDERATION_GPG_POLICY_PREFIX + "readonly";
public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = false;
/**
* Which sub-clusters the policy generator should blacklist.
*/
public static final String GPG_POLICY_GENERATOR_BLACKLIST =
FEDERATION_GPG_POLICY_PREFIX + "blacklist";
/**
* Connection and Read timeout from the Router to RM.
*/

View File

@ -5426,4 +5426,46 @@
<value>100ms</value>
</property>
<property>
<description>
The interval at which the policy generator runs, default is one hour
</description>
<name>yarn.federation.gpg.policy.generator.interval</name>
<value>1h</value>
</property>
<property>
<description>
The interval at which the policy generator runs, default is one hour.
This is an deprecated property, We better set it
`yarn.federation.gpg.policy.generator.interval`.
</description>
<name>yarn.federation.gpg.policy.generator.interval-ms</name>
<value>3600000</value>
</property>
<property>
<description>
The configured policy generator class, runs NoOpGlobalPolicy by default
</description>
<name>yarn.federation.gpg.policy.generator.class</name>
<value>org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy</value>
</property>
<property>
<description>
Whether or not the policy generator is running in read only (won't modify policies), default is false
</description>
<name>yarn.federation.gpg.policy.generator.readonly</name>
<value>false</value>
</property>
<property>
<description>
Which subclusters the gpg should blacklist, default is none
</description>
<name>yarn.federation.gpg.policy.generator.blacklist</name>
<value></value>
</property>
</configuration>

View File

@ -61,6 +61,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-timelineservice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
@ -72,6 +78,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
@ -106,6 +118,12 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>src/test/resources/schedulerInfo1.json</exclude>
<exclude>src/test/resources/schedulerInfo2.json</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -28,4 +28,8 @@ public interface GPGContext {
FederationStateStoreFacade getStateStoreFacade();
void setStateStoreFacade(FederationStateStoreFacade facade);
GPGPolicyFacade getPolicyFacade();
void setPolicyFacade(GPGPolicyFacade facade);
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
public class GPGContextImpl implements GPGContext {
private FederationStateStoreFacade facade;
private GPGPolicyFacade policyFacade;
@Override
public FederationStateStoreFacade getStateStoreFacade() {
@ -38,4 +39,13 @@ public class GPGContextImpl implements GPGContext {
this.facade = federationStateStoreFacade;
}
@Override
public GPGPolicyFacade getPolicyFacade(){
return policyFacade;
}
@Override
public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
policyFacade = gpgPolicyfacade;
}
}

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* A utility class for the GPG Policy Generator to read and write policies
* into the FederationStateStore. Policy specific logic is abstracted away in
* this class, so the PolicyGenerator can avoid dealing with policy
* construction, reinitialization, and serialization.
*
* There are only two exposed methods:
*
* {@link #getPolicyManager(String)}
* Gets the PolicyManager via queue name. Null if there is no policy
* configured for the specified queue. The PolicyManager can be used to
* extract the {@link FederationRouterPolicy} and
* {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
*
* {@link #setPolicyManager(FederationPolicyManager)}
* Sets the PolicyManager. If the policy configuration is the same, no change
* occurs. Otherwise, the internal cache is updated and the new configuration
* is written into the FederationStateStore
*
* This class assumes that the GPG is the only service
* writing policies. Thus, the only FederationStateStore reads occur the first
* time a queue policy is retrieved - after that, the GPG only writes to the
* FederationStateStore.
*
* The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
* cache. The primary use for these caches are to serve reads, and to
* identify when the PolicyGenerator has actually changed the policy
* so unnecessary FederationStateStore policy writes can be avoided.
*/
public class GPGPolicyFacade {
private static final Logger LOG =
LoggerFactory.getLogger(GPGPolicyFacade.class);
private FederationStateStoreFacade stateStore;
private Map<String, FederationPolicyManager> policyManagerMap;
private Map<String, SubClusterPolicyConfiguration> policyConfMap;
private boolean readOnly;
public GPGPolicyFacade(FederationStateStoreFacade stateStore,
Configuration conf) {
this.stateStore = stateStore;
this.policyManagerMap = new HashMap<>();
this.policyConfMap = new HashMap<>();
this.readOnly =
conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
}
/**
* Provides a utility for the policy generator to read the policy manager
* from the FederationStateStore. Because the policy generator should be the
* only component updating the policy, this implementation does not use the
* reinitialization feature.
*
* @param queueName the name of the queue we want the policy manager for.
* @return the policy manager responsible for the queue policy.
* @throws YarnException exceptions from yarn servers.
*/
public FederationPolicyManager getPolicyManager(String queueName)
throws YarnException {
FederationPolicyManager policyManager = policyManagerMap.get(queueName);
// If we don't have the policy manager cached, pull configuration
// from the FederationStateStore to create and cache it
if (policyManager == null) {
try {
// If we don't have the configuration cached, pull it
// from the stateStore
SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
if (conf == null) {
conf = stateStore.getPolicyConfiguration(queueName);
}
// If configuration is still null, it does not exist in the
// FederationStateStore
if (conf == null) {
LOG.info("Read null policy for queue {}", queueName);
return null;
}
policyManager =
FederationPolicyUtils.instantiatePolicyManager(conf.getType());
policyManager.setQueue(queueName);
// TODO there is currently no way to cleanly deserialize a policy
// manager sub type from just the configuration
if (policyManager instanceof WeightedLocalityPolicyManager) {
WeightedPolicyInfo wpinfo =
WeightedPolicyInfo.fromByteBuffer(conf.getParams());
WeightedLocalityPolicyManager wlpmanager =
(WeightedLocalityPolicyManager) policyManager;
LOG.info("Updating policy for queue {} to configured weights router: "
+ "{}, amrmproxy: {}", queueName,
wpinfo.getRouterPolicyWeights(),
wpinfo.getAMRMPolicyWeights());
wlpmanager.setWeightedPolicyInfo(wpinfo);
} else {
LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ "initialization may be incomplete ", policyManager.getClass());
}
policyManagerMap.put(queueName, policyManager);
policyConfMap.put(queueName, conf);
} catch (YarnException e) {
LOG.error("Error reading SubClusterPolicyConfiguration from state "
+ "store for queue: {}", queueName);
throw e;
}
}
return policyManager;
}
/**
* Provides a utility for the policy generator to write a policy manager
* into the FederationStateStore. The facade keeps a cache and will only write
* into the FederationStateStore if the policy configuration has changed.
*
* @param policyManager The policy manager we want to update into the state
* store. It contains policy information as well as
* the queue name we will update for.
* @throws YarnException exceptions from yarn servers.
*/
public void setPolicyManager(FederationPolicyManager policyManager)
throws YarnException {
if (policyManager == null) {
LOG.warn("Attempting to set null policy manager");
return;
}
// Extract the configuration from the policy manager
String queue = policyManager.getQueue();
SubClusterPolicyConfiguration conf;
try {
conf = policyManager.serializeConf();
} catch (FederationPolicyInitializationException e) {
LOG.warn("Error serializing policy for queue {}", queue);
throw e;
}
if (conf == null) {
// State store does not currently support setting a policy back to null
// because it reads the queue name to set from the policy!
LOG.warn("Skip setting policy to null for queue {} into state store",
queue);
return;
}
// Compare with configuration cache, if different, write the conf into
// store and update our conf and manager cache
if (!confCacheEqual(queue, conf)) {
try {
if (readOnly) {
LOG.info("[read-only] Skipping policy update for queue {}", queue);
return;
}
LOG.info("Updating policy for queue {} into state store", queue);
stateStore.setPolicyConfiguration(conf);
policyConfMap.put(queue, conf);
policyManagerMap.put(queue, policyManager);
} catch (YarnException e) {
LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+ "store for queue: {}", queue);
throw e;
}
} else {
LOG.info("Setting unchanged policy - state store write skipped");
}
}
/**
* @param queue the queue to check the cached policy configuration for
* @param conf the new policy configuration
* @return whether or not the conf is equal to the cached conf
*/
private boolean confCacheEqual(String queue,
SubClusterPolicyConfiguration conf) {
SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
if (conf == null && cachedConf == null) {
return true;
} else if (conf != null && cachedConf != null) {
if (conf.equals(cachedConf)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
/**
* GPGUtils contains utility functions for the GPG.
*
*/
public final class GPGUtils {
// hide constructor
private GPGUtils() {
}
/**
* Performs an invocation of the remote RMWebService.
*
* @param <T> Generic T.
* @param webAddr WebAddress.
* @param path url path.
* @param returnType return type.
* @return response entity.
*/
public static <T> T invokeRMWebService(String webAddr, String path, final Class<T> returnType) {
Client client = Client.create();
T obj = null;
WebResource webResource = client.resource(webAddr);
ClientResponse response = webResource.path("ws/v1/cluster").path(path)
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
if (response.getStatus() == HttpServletResponse.SC_OK) {
obj = response.getEntity(returnType);
} else {
throw new YarnRuntimeException("Bad response from remote web service: "
+ response.getStatus());
}
return obj;
}
/**
* Creates a uniform weighting of 1.0 for each sub cluster.
*
* @param ids subClusterId set
* @return weight of subCluster.
*/
public static Map<SubClusterIdInfo, Float> createUniformWeights(
Set<SubClusterId> ids) {
Map<SubClusterIdInfo, Float> weights =
new HashMap<>();
for(SubClusterId id : ids) {
weights.put(new SubClusterIdInfo(id), 1.0f);
}
return weights;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,6 +68,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
private PolicyGenerator policyGenerator;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
@ -94,13 +96,16 @@ public class GlobalPolicyGenerator extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Set up the context
this.gpgContext
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
this.gpgContext.setStateStoreFacade(FederationStateStoreFacade.getInstance());
GPGPolicyFacade gpgPolicyFacade =
new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
this.gpgContext.setPolicyFacade(gpgPolicyFacade);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME);
@ -129,6 +134,35 @@ public class GlobalPolicyGenerator extends CompositeService {
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
// Schedule PolicyGenerator
// We recommend using yarn.federation.gpg.policy.generator.interval
// instead of yarn.federation.gpg.policy.generator.interval-ms
// To ensure compatibility,
// let's first obtain the value of "yarn.federation.gpg.policy.generator.interval-ms."
long policyGeneratorIntervalMillis = 0L;
String generatorIntervalMS = config.get(YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS);
if (generatorIntervalMS != null) {
LOG.warn("yarn.federation.gpg.policy.generator.interval-ms is deprecated property, " +
" we better set it yarn.federation.gpg.policy.generator.interval.");
policyGeneratorIntervalMillis = Long.parseLong(generatorIntervalMS);
}
// If it is not available, let's retrieve
// the value of "yarn.federation.gpg.policy.generator.interval" instead.
if (policyGeneratorIntervalMillis == 0) {
policyGeneratorIntervalMillis = config.getTimeDuration(
YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL,
YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL, TimeUnit.MILLISECONDS);
}
if(policyGeneratorIntervalMillis > 0){
this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
LOG.info("Scheduled policy-generator with interval: {}",
DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
}
}
@Override

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.Collections;
import java.util.Map;
/**
* This interface defines the plug-able policy that the PolicyGenerator uses
* to update policies into the state store.
*/
public abstract class GlobalPolicy implements Configurable {
private Configuration conf;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return conf;
}
/**
* Return a map of the object type and RM path to request it from - the
* framework will query these paths and provide the objects to the policy.
* Delegating this responsibility to the PolicyGenerator enables us to avoid
* duplicate calls to the same * endpoints as the GlobalPolicy is invoked
* once per queue.
*
* @return a map of the object type and RM path.
*/
protected Map<Class, String> registerPaths() {
// Default register nothing
return Collections.emptyMap();
}
/**
* Given a queue, cluster metrics, and policy manager, update the policy
* to account for the cluster status. This method defines the policy generator
* behavior.
*
* @param queueName name of the queue
* @param clusterInfo subClusterId map to cluster information about the
* SubCluster used to make policy decisions
* @param manager the FederationPolicyManager for the queue's existing
* policy the manager may be null, in which case the policy
* will need to be created
* @return policy manager that handles the updated (or created) policy
*/
protected abstract FederationPolicyManager updatePolicy(String queueName,
Map<SubClusterId, Map<Class, Object>> clusterInfo,
FederationPolicyManager manager);
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import java.util.Map;
/**
* Default policy that does not update any policy configurations.
*/
public class NoOpGlobalPolicy extends GlobalPolicy{
@Override
public FederationPolicyManager updatePolicy(String queueName,
Map<SubClusterId, Map<Class, Object>> clusterInfo,
FederationPolicyManager manager) {
return null;
}
}

View File

@ -0,0 +1,268 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* The PolicyGenerator runs periodically and updates the policy configuration
* for each queue into the FederationStateStore. The policy update behavior is
* defined by the GlobalPolicy instance that is used.
*/
public class PolicyGenerator implements Runnable, Configurable {
private static final Logger LOG =
LoggerFactory.getLogger(PolicyGenerator.class);
private GPGContext gpgContext;
private Configuration conf;
// Information request map
private Map<Class, String> pathMap = new HashMap<>();
// Global policy instance
@VisibleForTesting
private GlobalPolicy policy;
/**
* The PolicyGenerator periodically reads SubCluster load and updates
* policies into the FederationStateStore.
*
* @param conf Configuration.
* @param context GPG Context.
*/
public PolicyGenerator(Configuration conf, GPGContext context) {
setConf(conf);
init(context);
}
private void init(GPGContext context) {
this.gpgContext = context;
LOG.info("Initialized PolicyGenerator");
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
this.policy = FederationStateStoreFacade.createInstance(conf,
YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS, GlobalPolicy.class);
policy.setConf(conf);
pathMap.putAll(policy.registerPaths());
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public final void run() {
Map<SubClusterId, SubClusterInfo> activeSubClusters;
try {
activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
} catch (YarnException e) {
LOG.error("Error retrieving active sub-clusters", e);
return;
}
// Parse the scheduler information from all the SCs
Map<SubClusterId, SchedulerInfo> schedInfo = getSchedulerInfo(activeSubClusters);
// Extract and enforce that all the schedulers have matching type
Set<String> queueNames = extractQueues(schedInfo);
// Remove black listed SubClusters
activeSubClusters.keySet().removeAll(getBlackList());
LOG.info("Active non-blacklist sub-clusters: {}",
activeSubClusters.keySet());
// Get cluster metrics information from non-black listed RMs - later used
// to evaluate SubCluster load
Map<SubClusterId, Map<Class, Object>> clusterInfo =
getInfos(activeSubClusters);
// Update into the FederationStateStore
for (String queueName : queueNames) {
// Retrieve the manager from the policy facade
FederationPolicyManager manager;
try {
manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
} catch (YarnException e) {
LOG.error("GetPolicy for queue {} failed", queueName, e);
continue;
}
LOG.info("Updating policy for queue {}", queueName);
manager = policy.updatePolicy(queueName, clusterInfo, manager);
try {
this.gpgContext.getPolicyFacade().setPolicyManager(manager);
} catch (YarnException e) {
LOG.error("SetPolicy for queue {} failed", queueName, e);
}
}
}
/**
* Helper to retrieve metrics from the RM REST endpoints.
*
* @param activeSubClusters A map of active SubCluster IDs to info
* @return Mapping relationship between SubClusterId and Metric.
*/
@VisibleForTesting
protected Map<SubClusterId, Map<Class, Object>> getInfos(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, Map<Class, Object>> clusterInfo = new HashMap<>();
for (SubClusterInfo sci : activeSubClusters.values()) {
for (Map.Entry<Class, String> e : this.pathMap.entrySet()) {
if (!clusterInfo.containsKey(sci.getSubClusterId())) {
clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
}
Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
e.getValue(), e.getKey());
clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
}
}
return clusterInfo;
}
/**
* Helper to retrieve SchedulerInfos.
*
* @param activeSubClusters A map of active SubCluster IDs to info
* @return Mapping relationship between SubClusterId and SubClusterInfo.
*/
@VisibleForTesting
protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, SchedulerInfo> schedInfo =
new HashMap<>();
for (SubClusterInfo sci : activeSubClusters.values()) {
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(sci.getRMWebServiceAddress(),
RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
if(sti != null){
schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
} else {
LOG.warn("Skipped null scheduler info from SubCluster " + sci
.getSubClusterId().toString());
}
}
return schedInfo;
}
/**
* Helper to get a set of blacklisted SubCluster Ids from configuration.
*/
private Set<SubClusterId> getBlackList() {
String blackListParam =
conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST);
if(blackListParam == null){
return Collections.emptySet();
}
Set<SubClusterId> blackList = new HashSet<>();
for (String id : blackListParam.split(",")) {
blackList.add(SubClusterId.newInstance(id));
}
return blackList;
}
/**
* Given the scheduler information for all RMs, extract the union of
* queue names - right now we only consider instances of capacity scheduler.
*
* @param schedInfo the scheduler information
* @return a set of queue names
*/
private Set<String> extractQueues(Map<SubClusterId, SchedulerInfo> schedInfo) {
Set<String> queueNames = new HashSet<>();
for (Map.Entry<SubClusterId, SchedulerInfo> entry : schedInfo.entrySet()) {
if (entry.getValue() instanceof CapacitySchedulerInfo) {
// Flatten the queue structure and get only non leaf queues
queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
.get(CapacitySchedulerQueueInfo.class));
} else {
LOG.warn("Skipping SubCluster {}, not configured with capacity scheduler.",
entry.getKey());
}
}
return queueNames;
}
// Helpers to flatten the queue structure into a multimap of
// queue type to set of queue names
private Map<Class, Set<String>> flattenQueue(CapacitySchedulerInfo csi) {
Map<Class, Set<String>> flattened = new HashMap<>();
addOrAppend(flattened, csi.getClass(), csi.getQueueName());
for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
flattenQueue(csqi, flattened);
}
return flattened;
}
private void flattenQueue(CapacitySchedulerQueueInfo csi,
Map<Class, Set<String>> flattened) {
addOrAppend(flattened, csi.getClass(), csi.getQueueName());
if (csi.getQueues() != null) {
for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
flattenQueue(csqi, flattened);
}
}
}
private <K, V> void addOrAppend(Map<K, Set<V>> multimap, K key, V value) {
if (!multimap.containsKey(key)) {
multimap.put(key, new HashSet<>());
}
multimap.get(key).add(value);
}
public GlobalPolicy getPolicy() {
return policy;
}
public void setPolicy(GlobalPolicy policy) {
this.policy = policy;
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* Simple policy that generates and updates uniform weighted locality
* policies.
*/
public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy {
private static final Logger LOG =
LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class);
@Override
protected FederationPolicyManager updatePolicy(String queueName,
Map<SubClusterId, Map<Class, Object>> clusterInfo, FederationPolicyManager currentManager){
if(currentManager == null){
// Set uniform weights for all SubClusters
LOG.info("Creating uniform weighted policy queue {}.", queueName);
WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
manager.setQueue(queueName);
Map<SubClusterIdInfo, Float> policyWeights =
GPGUtils.createUniformWeights(clusterInfo.keySet());
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(policyWeights);
manager.getWeightedPolicyInfo().setRouterPolicyWeights(policyWeights);
currentManager = manager;
}
if(currentManager instanceof WeightedLocalityPolicyManager){
LOG.info("Updating policy for queue {} to default weights.", queueName);
WeightedLocalityPolicyManager wlpmanager = (WeightedLocalityPolicyManager) currentManager;
Map<SubClusterIdInfo, Float> uniformWeights =
GPGUtils.createUniformWeights(clusterInfo.keySet());
wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights(uniformWeights);
wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights(uniformWeights);
} else {
LOG.info("Policy for queue {} is of type {}, expected {}",
queueName, currentManager.getClass(), WeightedLocalityPolicyManager.class);
}
return currentManager;
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Classes comprising the policy generator for the GPG. Responsibilities include
* generating and updating policies based on the cluster status.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* Unit test for GPG Policy Facade.
*/
public class TestGPGPolicyFacade {
private Configuration conf;
private FederationStateStore stateStore;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private GPGPolicyFacade policyFacade;
private Set<SubClusterId> subClusterIds;
private SubClusterPolicyConfiguration testConf;
private static final String TEST_QUEUE = "test-queue";
public TestGPGPolicyFacade() {
conf = new Configuration();
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
subClusterIds = new HashSet<>();
subClusterIds.add(SubClusterId.newInstance("sc0"));
subClusterIds.add(SubClusterId.newInstance("sc1"));
subClusterIds.add(SubClusterId.newInstance("sc2"));
}
@Before
public void setUp() throws IOException, YarnException {
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade.reinitialize(stateStore, conf);
policyFacade = new GPGPolicyFacade(facade, conf);
WeightedLocalityPolicyManager manager =
new WeightedLocalityPolicyManager();
// Add a test policy for test queue
manager.setQueue(TEST_QUEUE);
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
manager.getWeightedPolicyInfo().setRouterPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
testConf = manager.serializeConf();
stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
.newInstance(testConf));
}
@After
public void tearDown() throws Exception {
stateStore.close();
stateStore = null;
}
@Test
public void testGetPolicy() throws YarnException {
WeightedLocalityPolicyManager manager =
(WeightedLocalityPolicyManager) policyFacade
.getPolicyManager(TEST_QUEUE);
Assert.assertEquals(testConf, manager.serializeConf());
}
/**
* Test that new policies are written into the state store.
*/
@Test
public void testSetNewPolicy() throws YarnException {
WeightedLocalityPolicyManager manager =
new WeightedLocalityPolicyManager();
manager.setQueue(TEST_QUEUE + 0);
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
manager.getWeightedPolicyInfo().setRouterPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
SubClusterPolicyConfiguration policyConf = manager.serializeConf();
policyFacade.setPolicyManager(manager);
manager =
(WeightedLocalityPolicyManager) policyFacade
.getPolicyManager(TEST_QUEUE + 0);
Assert.assertEquals(policyConf, manager.serializeConf());
}
/**
* Test that overwriting policies are updated in the state store.
*/
@Test
public void testOverwritePolicy() throws YarnException {
subClusterIds.add(SubClusterId.newInstance("sc3"));
WeightedLocalityPolicyManager manager =
new WeightedLocalityPolicyManager();
manager.setQueue(TEST_QUEUE);
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
manager.getWeightedPolicyInfo().setRouterPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
SubClusterPolicyConfiguration policyConf = manager.serializeConf();
policyFacade.setPolicyManager(manager);
manager =
(WeightedLocalityPolicyManager) policyFacade
.getPolicyManager(TEST_QUEUE);
Assert.assertEquals(policyConf, manager.serializeConf());
}
/**
* Test that the write through cache works.
*/
@Test
public void testWriteCache() throws YarnException {
stateStore = mock(MemoryFederationStateStore.class);
facade.reinitialize(stateStore, conf);
when(stateStore.getPolicyConfiguration(Matchers.any(
GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
policyFacade = new GPGPolicyFacade(facade, conf);
// Query once to fill the cache
FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE);
// State store should be contacted once
verify(stateStore, times(1)).getPolicyConfiguration(
Matchers.any(GetSubClusterPolicyConfigurationRequest.class));
// If we set the same policy, the state store should be untouched
policyFacade.setPolicyManager(manager);
verify(stateStore, times(0)).setPolicyConfiguration(
Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
}
/**
* Test that when read only is enabled, the state store is not changed.
*/
@Test
public void testReadOnly() throws YarnException {
conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true);
stateStore = mock(MemoryFederationStateStore.class);
facade.reinitialize(stateStore, conf);
when(stateStore.getPolicyConfiguration(Matchers.any(
GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
policyFacade = new GPGPolicyFacade(facade, conf);
// If we set a policy, the state store should be untouched
WeightedLocalityPolicyManager manager =
new WeightedLocalityPolicyManager();
// Add a test policy for test queue
manager.setQueue(TEST_QUEUE);
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
manager.getWeightedPolicyInfo().setRouterPolicyWeights(
GPGUtils.createUniformWeights(subClusterIds));
policyFacade.setPolicyManager(manager);
verify(stateStore, times(0)).setPolicyConfiguration(
Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
}
}

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
import com.sun.jersey.api.json.JSONConfiguration;
import com.sun.jersey.api.json.JSONJAXBContext;
import com.sun.jersey.api.json.JSONUnmarshaller;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Unit test for GPG Policy Generator.
*/
public class TestPolicyGenerator {
private static final int NUM_SC = 3;
private Configuration conf;
private FederationStateStore stateStore;
private FederationStateStoreFacade facade =
FederationStateStoreFacade.getInstance();
private List<SubClusterId> subClusterIds;
private Map<SubClusterId, SubClusterInfo> subClusterInfos;
private Map<SubClusterId, Map<Class, Object>> clusterInfos;
private Map<SubClusterId, SchedulerInfo> schedulerInfos;
private GPGContext gpgContext;
private PolicyGenerator policyGenerator;
public TestPolicyGenerator() {
conf = new Configuration();
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
gpgContext = new GPGContextImpl();
gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf));
gpgContext.setStateStoreFacade(facade);
}
@Before
public void setUp() throws IOException, YarnException, JAXBException {
subClusterIds = new ArrayList<>();
subClusterInfos = new HashMap<>();
clusterInfos = new HashMap<>();
schedulerInfos = new HashMap<>();
CapacitySchedulerInfo sti1 =
readJSON("src/test/resources/schedulerInfo1.json",
CapacitySchedulerInfo.class);
CapacitySchedulerInfo sti2 =
readJSON("src/test/resources/schedulerInfo2.json",
CapacitySchedulerInfo.class);
// Set up sub clusters
for (int i = 0; i < NUM_SC; ++i) {
// Sub cluster Id
SubClusterId id = SubClusterId.newInstance("sc" + i);
subClusterIds.add(id);
// Sub cluster info
SubClusterInfo cluster = SubClusterInfo
.newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i,
"rmweb:" + i, SubClusterState.SC_RUNNING, 0, "");
subClusterInfos.put(id, cluster);
// Cluster metrics info
ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
metricsInfo.setAppsPending(2000);
if (!clusterInfos.containsKey(id)) {
clusterInfos.put(id, new HashMap<Class, Object>());
}
clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo);
schedulerInfos.put(id, sti1);
}
// Change one of the sub cluster schedulers
schedulerInfos.put(subClusterIds.get(0), sti2);
stateStore = mock(FederationStateStore.class);
when(stateStore.getSubClusters(any()))
.thenReturn(GetSubClustersInfoResponse.newInstance(
new ArrayList<>(subClusterInfos.values())));
facade.reinitialize(stateStore, conf);
}
@After
public void tearDown() throws Exception {
stateStore.close();
stateStore = null;
}
private <T> T readJSON(String pathname, Class<T> classy)
throws IOException, JAXBException {
JSONJAXBContext jc =
new JSONJAXBContext(JSONConfiguration.mapped().build(), classy);
JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
String contents = new String(Files.readAllBytes(Paths.get(pathname)));
return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy);
}
@Test
public void testPolicyGenerator() throws YarnException {
policyGenerator = new TestablePolicyGenerator();
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", clusterInfos, null);
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default2", clusterInfos, null);
}
@Test
public void testBlacklist() throws YarnException {
conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
subClusterIds.get(0).toString());
Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
new HashMap<>(clusterInfos);
blacklistedCMI.remove(subClusterIds.get(0));
policyGenerator = new TestablePolicyGenerator();
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", blacklistedCMI, null);
verify(policyGenerator.getPolicy(), times(0))
.updatePolicy("default", clusterInfos, null);
}
@Test
public void testBlacklistTwo() throws YarnException {
conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
subClusterIds.get(0).toString() + "," + subClusterIds.get(1)
.toString());
Map<SubClusterId, Map<Class, Object>> blacklistedCMI =
new HashMap<>(clusterInfos);
blacklistedCMI.remove(subClusterIds.get(0));
blacklistedCMI.remove(subClusterIds.get(1));
policyGenerator = new TestablePolicyGenerator();
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy("default", blacklistedCMI, null);
verify(policyGenerator.getPolicy(), times(0))
.updatePolicy("default", clusterInfos, null);
}
@Test
public void testExistingPolicy() throws YarnException {
WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
// Add a test policy for test queue
manager.setQueue("default");
manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils
.createUniformWeights(new HashSet<>(subClusterIds)));
manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils
.createUniformWeights(new HashSet<>(subClusterIds)));
SubClusterPolicyConfiguration testConf = manager.serializeConf();
when(stateStore.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance("default")))
.thenReturn(
GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
policyGenerator = new TestablePolicyGenerator();
policyGenerator.setPolicy(mock(GlobalPolicy.class));
policyGenerator.run();
ArgumentCaptor<FederationPolicyManager> argCaptor =
ArgumentCaptor.forClass(FederationPolicyManager.class);
verify(policyGenerator.getPolicy(), times(1))
.updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
assertEquals(argCaptor.getValue().getClass(), manager.getClass());
assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
}
@Test
public void testCallRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
final String a1 = a + ".a1";
final String a2 = a + ".a2";
final String b1 = b + ".b1";
final String b2 = b + ".b2";
final String b3 = b + ".b3";
float aCapacity = 10.5f;
float bCapacity = 89.5f;
float a1Capacity = 30;
float a2Capacity = 70;
float b1Capacity = 79.2f;
float b2Capacity = 0.8f;
float b3Capacity = 20;
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
csConf.setCapacity(a, aCapacity);
csConf.setCapacity(b, bCapacity);
// Define 2nd-level queues
csConf.setQueues(a, new String[] {"a1", "a2"});
csConf.setCapacity(a1, a1Capacity);
csConf.setUserLimitFactor(a1, 100.0f);
csConf.setCapacity(a2, a2Capacity);
csConf.setUserLimitFactor(a2, 100.0f);
csConf.setQueues(b, new String[] {"b1", "b2", "b3"});
csConf.setCapacity(b1, b1Capacity);
csConf.setUserLimitFactor(b1, 100.0f);
csConf.setCapacity(b2, b2Capacity);
csConf.setUserLimitFactor(b2, 100.0f);
csConf.setCapacity(b3, b3Capacity);
csConf.setUserLimitFactor(b3, 100.0f);
YarnConfiguration rmConf = new YarnConfiguration(csConf);
ResourceManager resourceManager = new ResourceManager();
rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
resourceManager.init(rmConf);
resourceManager.start();
String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
SchedulerTypeInfo sti = GPGUtils
.invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
SchedulerTypeInfo.class);
Assert.assertNotNull(sti);
}
/**
* Testable policy generator overrides the methods that communicate
* with the RM REST endpoint, allowing us to inject faked responses.
*/
class TestablePolicyGenerator extends PolicyGenerator {
TestablePolicyGenerator() {
super(conf, gpgContext);
}
@Override
protected Map<SubClusterId, Map<Class, Object>> getInfos(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, Map<Class, Object>> ret = new HashMap<>();
for (SubClusterId id : activeSubClusters.keySet()) {
if (!ret.containsKey(id)) {
ret.put(id, new HashMap<>());
}
ret.get(id).put(ClusterMetricsInfo.class,
clusterInfos.get(id).get(ClusterMetricsInfo.class));
}
return ret;
}
@Override
protected Map<SubClusterId, SchedulerInfo> getSchedulerInfo(
Map<SubClusterId, SubClusterInfo> activeSubClusters) {
Map<SubClusterId, SchedulerInfo> ret = new HashMap<>();
for (SubClusterId id : activeSubClusters.keySet()) {
ret.put(id, schedulerInfos.get(id));
}
return ret;
}
}
}

View File

@ -0,0 +1,134 @@
{
"capacity": 100.0,
"usedCapacity": 0.0,
"maxCapacity": 100.0,
"queueName": "root",
"queues": {
"queue": [
{
"type": "capacitySchedulerLeafQueueInfo",
"capacity": 100.0,
"usedCapacity": 0.0,
"maxCapacity": 100.0,
"absoluteCapacity": 100.0,
"absoluteMaxCapacity": 100.0,
"absoluteUsedCapacity": 0.0,
"numApplications": 484,
"queueName": "default",
"state": "RUNNING",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"hideReservationQueues": false,
"nodeLabels": [
"*"
],
"numActiveApplications": 484,
"numPendingApplications": 0,
"numContainers": 0,
"maxApplications": 10000,
"maxApplicationsPerUser": 10000,
"userLimit": 100,
"users": {
"user": [
{
"username": "Default",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"numPendingApplications": 0,
"numActiveApplications": 468,
"AMResourceUsed": {
"memory": 30191616,
"vCores": 468
},
"userResourceLimit": {
"memory": 31490048,
"vCores": 7612
}
}
]
},
"userLimitFactor": 1.0,
"AMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"usedAMResource": {
"memory": 30388224,
"vCores": 532
},
"userAMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"preemptionDisabled": true
}
]
},
"health": {
"lastrun": 1517951638085,
"operationsInfo": {
"entry": {
"key": "last-allocation",
"value": {
"nodeId": "node0:0",
"containerId": "container_e61477_1517922128312_0340_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-reservation",
"value": {
"nodeId": "node0:1",
"containerId": "container_e61477_1517879828320_0249_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-release",
"value": {
"nodeId": "node0:2",
"containerId": "container_e61477_1517922128312_0340_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-preemption",
"value": {
"nodeId": "N/A",
"containerId": "N/A",
"queue": "N/A"
}
}
},
"lastRunDetails": [
{
"operation": "releases",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
},
{
"operation": "allocations",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
},
{
"operation": "reservations",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
}
]
}
}

View File

@ -0,0 +1,196 @@
{
"type": "capacityScheduler",
"capacity": 100.0,
"usedCapacity": 0.0,
"maxCapacity": 100.0,
"queueName": "root",
"queues": {
"queue": [
{
"type": "capacitySchedulerLeafQueueInfo",
"capacity": 100.0,
"usedCapacity": 0.0,
"maxCapacity": 100.0,
"absoluteCapacity": 100.0,
"absoluteMaxCapacity": 100.0,
"absoluteUsedCapacity": 0.0,
"numApplications": 484,
"queueName": "default",
"state": "RUNNING",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"hideReservationQueues": false,
"nodeLabels": [
"*"
],
"numActiveApplications": 484,
"numPendingApplications": 0,
"numContainers": 0,
"maxApplications": 10000,
"maxApplicationsPerUser": 10000,
"userLimit": 100,
"users": {
"user": [
{
"username": "Default",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"numPendingApplications": 0,
"numActiveApplications": 468,
"AMResourceUsed": {
"memory": 30191616,
"vCores": 468
},
"userResourceLimit": {
"memory": 31490048,
"vCores": 7612
}
}
]
},
"userLimitFactor": 1.0,
"AMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"usedAMResource": {
"memory": 30388224,
"vCores": 532
},
"userAMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"preemptionDisabled": true
},
{
"type": "capacitySchedulerLeafQueueInfo",
"capacity": 100.0,
"usedCapacity": 0.0,
"maxCapacity": 100.0,
"absoluteCapacity": 100.0,
"absoluteMaxCapacity": 100.0,
"absoluteUsedCapacity": 0.0,
"numApplications": 484,
"queueName": "default2",
"state": "RUNNING",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"hideReservationQueues": false,
"nodeLabels": [
"*"
],
"numActiveApplications": 484,
"numPendingApplications": 0,
"numContainers": 0,
"maxApplications": 10000,
"maxApplicationsPerUser": 10000,
"userLimit": 100,
"users": {
"user": [
{
"username": "Default",
"resourcesUsed": {
"memory": 0,
"vCores": 0
},
"numPendingApplications": 0,
"numActiveApplications": 468,
"AMResourceUsed": {
"memory": 30191616,
"vCores": 468
},
"userResourceLimit": {
"memory": 31490048,
"vCores": 7612
}
}
]
},
"userLimitFactor": 1.0,
"AMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"usedAMResource": {
"memory": 30388224,
"vCores": 532
},
"userAMResourceLimit": {
"memory": 31490048,
"vCores": 7612
},
"preemptionDisabled": true
}
]
},
"health": {
"lastrun": 1517951638085,
"operationsInfo": {
"entry": {
"key": "last-allocation",
"value": {
"nodeId": "node0:0",
"containerId": "container_e61477_1517922128312_0340_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-reservation",
"value": {
"nodeId": "node0:1",
"containerId": "container_e61477_1517879828320_0249_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-release",
"value": {
"nodeId": "node0:2",
"containerId": "container_e61477_1517922128312_0340_01_000001",
"queue": "root.default"
}
},
"entry": {
"key": "last-preemption",
"value": {
"nodeId": "N/A",
"containerId": "N/A",
"queue": "N/A"
}
}
},
"lastRunDetails": [
{
"operation": "releases",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
},
{
"operation": "allocations",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
},
{
"operation": "reservations",
"count": 0,
"resources": {
"memory": 0,
"vCores": 0
}
}
]
}
}