From 6d32a06125f90485d3e90da2ebfa3d19cf6a818f Mon Sep 17 00:00:00 2001
From: slfan1989 <55643692+slfan1989@users.noreply.github.com>
Date: Fri, 28 Jul 2023 13:41:27 +0800
Subject: [PATCH] YARN-7707. BackPort [GPG] Policy generator framework. (#5810)
---
.../hadoop/yarn/conf/YarnConfiguration.java | 37 ++
.../src/main/resources/yarn-default.xml | 42 +++
.../pom.xml | 18 +
.../globalpolicygenerator/GPGContext.java | 4 +
.../globalpolicygenerator/GPGContextImpl.java | 10 +
.../GPGPolicyFacade.java | 222 ++++++++++++
.../globalpolicygenerator/GPGUtils.java | 86 +++++
.../GlobalPolicyGenerator.java | 38 +-
.../policygenerator/GlobalPolicy.java | 78 ++++
.../policygenerator/NoOpGlobalPolicy.java | 36 ++
.../policygenerator/PolicyGenerator.java | 268 ++++++++++++++
.../UniformWeightedLocalityGlobalPolicy.java | 68 ++++
.../policygenerator/package-info.java | 24 ++
.../TestGPGPolicyFacade.java | 202 +++++++++++
.../policygenerator/TestPolicyGenerator.java | 336 ++++++++++++++++++
.../src/test/resources/schedulerInfo1.json | 134 +++++++
.../src/test/resources/schedulerInfo2.json | 196 ++++++++++
17 files changed, 1797 insertions(+), 2 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 648fddbbbe..faa5c82d7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4390,6 +4390,43 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String GPG_KERBEROS_PRINCIPAL_HOSTNAME_KEY = FEDERATION_GPG_PREFIX +
"kerberos.principal.hostname";
+ public static final String FEDERATION_GPG_POLICY_PREFIX =
+ FEDERATION_GPG_PREFIX + "policy.generator.";
+
+ /** The interval at which the policy generator runs, default is one hour. */
+ public static final String GPG_POLICY_GENERATOR_INTERVAL =
+ FEDERATION_GPG_POLICY_PREFIX + "interval";
+ public static final long DEFAULT_GPG_POLICY_GENERATOR_INTERVAL = TimeUnit.HOURS.toMillis(1);
+
+ /** The interval at which the policy generator runs, default is one hour.
+ * This is an deprecated property, We better set it
+ * `yarn.federation.gpg.policy.generator.interval`. */
+ public static final String GPG_POLICY_GENERATOR_INTERVAL_MS =
+ FEDERATION_GPG_POLICY_PREFIX + "interval-ms";
+
+ /**
+ * The configured policy generator class, runs NoOpGlobalPolicy by
+ * default.
+ */
+ public static final String GPG_GLOBAL_POLICY_CLASS = FEDERATION_GPG_POLICY_PREFIX + "class";
+ public static final String DEFAULT_GPG_GLOBAL_POLICY_CLASS =
+ "org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator." +
+ "NoOpGlobalPolicy";
+
+ /**
+ * Whether or not the policy generator is running in read only (won't modify
+ * policies), default is false.
+ */
+ public static final String GPG_POLICY_GENERATOR_READONLY =
+ FEDERATION_GPG_POLICY_PREFIX + "readonly";
+ public static final boolean DEFAULT_GPG_POLICY_GENERATOR_READONLY = false;
+
+ /**
+ * Which sub-clusters the policy generator should blacklist.
+ */
+ public static final String GPG_POLICY_GENERATOR_BLACKLIST =
+ FEDERATION_GPG_POLICY_PREFIX + "blacklist";
+
/**
* Connection and Read timeout from the Router to RM.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index f722af852f..b643bd8d08 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5426,4 +5426,46 @@
100ms
+
+
+ The interval at which the policy generator runs, default is one hour
+
+ yarn.federation.gpg.policy.generator.interval
+ 1h
+
+
+
+
+ The interval at which the policy generator runs, default is one hour.
+ This is an deprecated property, We better set it
+ `yarn.federation.gpg.policy.generator.interval`.
+
+ yarn.federation.gpg.policy.generator.interval-ms
+ 3600000
+
+
+
+
+ The configured policy generator class, runs NoOpGlobalPolicy by default
+
+ yarn.federation.gpg.policy.generator.class
+ org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.NoOpGlobalPolicy
+
+
+
+
+ Whether or not the policy generator is running in read only (won't modify policies), default is false
+
+ yarn.federation.gpg.policy.generator.readonly
+ false
+
+
+
+
+ Which subclusters the gpg should blacklist, default is none
+
+ yarn.federation.gpg.policy.generator.blacklist
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
index d8a8b1e221..0d501680d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/pom.xml
@@ -61,6 +61,12 @@
test
+
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+ provided
+
+
org.apache.hadoop
hadoop-yarn-server-resourcemanager
@@ -72,6 +78,12 @@
test
+
+ org.mockito
+ mockito-all
+ test
+
+
org.apache.hadoop
hadoop-yarn-server-common
@@ -106,6 +118,12 @@
org.apache.rat
apache-rat-plugin
+
+
+ src/test/resources/schedulerInfo1.json
+ src/test/resources/schedulerInfo2.json
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
index da8a383bd7..6b0a5a4311 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContext.java
@@ -28,4 +28,8 @@ public interface GPGContext {
FederationStateStoreFacade getStateStoreFacade();
void setStateStoreFacade(FederationStateStoreFacade facade);
+
+ GPGPolicyFacade getPolicyFacade();
+
+ void setPolicyFacade(GPGPolicyFacade facade);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
index 3884ace9ce..bb498448fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGContextImpl.java
@@ -26,6 +26,7 @@
public class GPGContextImpl implements GPGContext {
private FederationStateStoreFacade facade;
+ private GPGPolicyFacade policyFacade;
@Override
public FederationStateStoreFacade getStateStoreFacade() {
@@ -38,4 +39,13 @@ public void setStateStoreFacade(
this.facade = federationStateStoreFacade;
}
+ @Override
+ public GPGPolicyFacade getPolicyFacade(){
+ return policyFacade;
+ }
+
+ @Override
+ public void setPolicyFacade(GPGPolicyFacade gpgPolicyfacade){
+ policyFacade = gpgPolicyfacade;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
new file mode 100644
index 0000000000..9a9de44046
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGPolicyFacade.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A utility class for the GPG Policy Generator to read and write policies
+ * into the FederationStateStore. Policy specific logic is abstracted away in
+ * this class, so the PolicyGenerator can avoid dealing with policy
+ * construction, reinitialization, and serialization.
+ *
+ * There are only two exposed methods:
+ *
+ * {@link #getPolicyManager(String)}
+ * Gets the PolicyManager via queue name. Null if there is no policy
+ * configured for the specified queue. The PolicyManager can be used to
+ * extract the {@link FederationRouterPolicy} and
+ * {@link FederationAMRMProxyPolicy}, as well as any policy specific parameters
+ *
+ * {@link #setPolicyManager(FederationPolicyManager)}
+ * Sets the PolicyManager. If the policy configuration is the same, no change
+ * occurs. Otherwise, the internal cache is updated and the new configuration
+ * is written into the FederationStateStore
+ *
+ * This class assumes that the GPG is the only service
+ * writing policies. Thus, the only FederationStateStore reads occur the first
+ * time a queue policy is retrieved - after that, the GPG only writes to the
+ * FederationStateStore.
+ *
+ * The class uses a PolicyManager cache and a SubClusterPolicyConfiguration
+ * cache. The primary use for these caches are to serve reads, and to
+ * identify when the PolicyGenerator has actually changed the policy
+ * so unnecessary FederationStateStore policy writes can be avoided.
+ */
+
+public class GPGPolicyFacade {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(GPGPolicyFacade.class);
+
+ private FederationStateStoreFacade stateStore;
+
+ private Map policyManagerMap;
+ private Map policyConfMap;
+
+ private boolean readOnly;
+
+ public GPGPolicyFacade(FederationStateStoreFacade stateStore,
+ Configuration conf) {
+ this.stateStore = stateStore;
+ this.policyManagerMap = new HashMap<>();
+ this.policyConfMap = new HashMap<>();
+ this.readOnly =
+ conf.getBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_READONLY);
+ }
+
+ /**
+ * Provides a utility for the policy generator to read the policy manager
+ * from the FederationStateStore. Because the policy generator should be the
+ * only component updating the policy, this implementation does not use the
+ * reinitialization feature.
+ *
+ * @param queueName the name of the queue we want the policy manager for.
+ * @return the policy manager responsible for the queue policy.
+ * @throws YarnException exceptions from yarn servers.
+ */
+ public FederationPolicyManager getPolicyManager(String queueName)
+ throws YarnException {
+ FederationPolicyManager policyManager = policyManagerMap.get(queueName);
+ // If we don't have the policy manager cached, pull configuration
+ // from the FederationStateStore to create and cache it
+ if (policyManager == null) {
+ try {
+ // If we don't have the configuration cached, pull it
+ // from the stateStore
+ SubClusterPolicyConfiguration conf = policyConfMap.get(queueName);
+ if (conf == null) {
+ conf = stateStore.getPolicyConfiguration(queueName);
+ }
+ // If configuration is still null, it does not exist in the
+ // FederationStateStore
+ if (conf == null) {
+ LOG.info("Read null policy for queue {}", queueName);
+ return null;
+ }
+ policyManager =
+ FederationPolicyUtils.instantiatePolicyManager(conf.getType());
+ policyManager.setQueue(queueName);
+
+ // TODO there is currently no way to cleanly deserialize a policy
+ // manager sub type from just the configuration
+ if (policyManager instanceof WeightedLocalityPolicyManager) {
+ WeightedPolicyInfo wpinfo =
+ WeightedPolicyInfo.fromByteBuffer(conf.getParams());
+ WeightedLocalityPolicyManager wlpmanager =
+ (WeightedLocalityPolicyManager) policyManager;
+ LOG.info("Updating policy for queue {} to configured weights router: "
+ + "{}, amrmproxy: {}", queueName,
+ wpinfo.getRouterPolicyWeights(),
+ wpinfo.getAMRMPolicyWeights());
+ wlpmanager.setWeightedPolicyInfo(wpinfo);
+ } else {
+ LOG.warn("Warning: FederationPolicyManager of unsupported type {}, "
+ + "initialization may be incomplete ", policyManager.getClass());
+ }
+
+ policyManagerMap.put(queueName, policyManager);
+ policyConfMap.put(queueName, conf);
+ } catch (YarnException e) {
+ LOG.error("Error reading SubClusterPolicyConfiguration from state "
+ + "store for queue: {}", queueName);
+ throw e;
+ }
+ }
+ return policyManager;
+ }
+
+ /**
+ * Provides a utility for the policy generator to write a policy manager
+ * into the FederationStateStore. The facade keeps a cache and will only write
+ * into the FederationStateStore if the policy configuration has changed.
+ *
+ * @param policyManager The policy manager we want to update into the state
+ * store. It contains policy information as well as
+ * the queue name we will update for.
+ * @throws YarnException exceptions from yarn servers.
+ */
+ public void setPolicyManager(FederationPolicyManager policyManager)
+ throws YarnException {
+ if (policyManager == null) {
+ LOG.warn("Attempting to set null policy manager");
+ return;
+ }
+ // Extract the configuration from the policy manager
+ String queue = policyManager.getQueue();
+ SubClusterPolicyConfiguration conf;
+ try {
+ conf = policyManager.serializeConf();
+ } catch (FederationPolicyInitializationException e) {
+ LOG.warn("Error serializing policy for queue {}", queue);
+ throw e;
+ }
+ if (conf == null) {
+ // State store does not currently support setting a policy back to null
+ // because it reads the queue name to set from the policy!
+ LOG.warn("Skip setting policy to null for queue {} into state store",
+ queue);
+ return;
+ }
+ // Compare with configuration cache, if different, write the conf into
+ // store and update our conf and manager cache
+ if (!confCacheEqual(queue, conf)) {
+ try {
+ if (readOnly) {
+ LOG.info("[read-only] Skipping policy update for queue {}", queue);
+ return;
+ }
+ LOG.info("Updating policy for queue {} into state store", queue);
+ stateStore.setPolicyConfiguration(conf);
+ policyConfMap.put(queue, conf);
+ policyManagerMap.put(queue, policyManager);
+ } catch (YarnException e) {
+ LOG.warn("Error writing SubClusterPolicyConfiguration to state "
+ + "store for queue: {}", queue);
+ throw e;
+ }
+ } else {
+ LOG.info("Setting unchanged policy - state store write skipped");
+ }
+ }
+
+ /**
+ * @param queue the queue to check the cached policy configuration for
+ * @param conf the new policy configuration
+ * @return whether or not the conf is equal to the cached conf
+ */
+ private boolean confCacheEqual(String queue,
+ SubClusterPolicyConfiguration conf) {
+ SubClusterPolicyConfiguration cachedConf = policyConfMap.get(queue);
+ if (conf == null && cachedConf == null) {
+ return true;
+ } else if (conf != null && cachedConf != null) {
+ if (conf.equals(cachedConf)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
new file mode 100644
index 0000000000..636ce92500
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * GPGUtils contains utility functions for the GPG.
+ *
+ */
+public final class GPGUtils {
+
+ // hide constructor
+ private GPGUtils() {
+ }
+
+ /**
+ * Performs an invocation of the remote RMWebService.
+ *
+ * @param Generic T.
+ * @param webAddr WebAddress.
+ * @param path url path.
+ * @param returnType return type.
+ * @return response entity.
+ */
+ public static T invokeRMWebService(String webAddr, String path, final Class returnType) {
+ Client client = Client.create();
+ T obj = null;
+
+ WebResource webResource = client.resource(webAddr);
+ ClientResponse response = webResource.path("ws/v1/cluster").path(path)
+ .accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
+ if (response.getStatus() == HttpServletResponse.SC_OK) {
+ obj = response.getEntity(returnType);
+ } else {
+ throw new YarnRuntimeException("Bad response from remote web service: "
+ + response.getStatus());
+ }
+ return obj;
+ }
+
+ /**
+ * Creates a uniform weighting of 1.0 for each sub cluster.
+ *
+ * @param ids subClusterId set
+ * @return weight of subCluster.
+ */
+ public static Map createUniformWeights(
+ Set ids) {
+ Map weights =
+ new HashMap<>();
+ for(SubClusterId id : ids) {
+ weights.put(new SubClusterIdInfo(id), 1.0f);
+ }
+ return weights;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
index 014067ad53..518f69d563 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GlobalPolicyGenerator.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class GlobalPolicyGenerator extends CompositeService {
// Scheduler service that runs tasks periodically
private ScheduledThreadPoolExecutor scheduledExecutorService;
private SubClusterCleaner subClusterCleaner;
+ private PolicyGenerator policyGenerator;
public GlobalPolicyGenerator() {
super(GlobalPolicyGenerator.class.getName());
@@ -94,13 +96,16 @@ protected void initAndStart(Configuration conf, boolean hasToReboot) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
// Set up the context
- this.gpgContext
- .setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ this.gpgContext.setStateStoreFacade(FederationStateStoreFacade.getInstance());
+ GPGPolicyFacade gpgPolicyFacade =
+ new GPGPolicyFacade(this.gpgContext.getStateStoreFacade(), conf);
+ this.gpgContext.setPolicyFacade(gpgPolicyFacade);
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
+ this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
DefaultMetricsSystem.initialize(METRICS_NAME);
@@ -129,6 +134,35 @@ protected void serviceStart() throws Exception {
LOG.info("Scheduled sub-cluster cleaner with interval: {}",
DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
}
+
+ // Schedule PolicyGenerator
+ // We recommend using yarn.federation.gpg.policy.generator.interval
+ // instead of yarn.federation.gpg.policy.generator.interval-ms
+
+ // To ensure compatibility,
+ // let's first obtain the value of "yarn.federation.gpg.policy.generator.interval-ms."
+ long policyGeneratorIntervalMillis = 0L;
+ String generatorIntervalMS = config.get(YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL_MS);
+ if (generatorIntervalMS != null) {
+ LOG.warn("yarn.federation.gpg.policy.generator.interval-ms is deprecated property, " +
+ " we better set it yarn.federation.gpg.policy.generator.interval.");
+ policyGeneratorIntervalMillis = Long.parseLong(generatorIntervalMS);
+ }
+
+ // If it is not available, let's retrieve
+ // the value of "yarn.federation.gpg.policy.generator.interval" instead.
+ if (policyGeneratorIntervalMillis == 0) {
+ policyGeneratorIntervalMillis = config.getTimeDuration(
+ YarnConfiguration.GPG_POLICY_GENERATOR_INTERVAL,
+ YarnConfiguration.DEFAULT_GPG_POLICY_GENERATOR_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ if(policyGeneratorIntervalMillis > 0){
+ this.scheduledExecutorService.scheduleAtFixedRate(this.policyGenerator,
+ 0, policyGeneratorIntervalMillis, TimeUnit.MILLISECONDS);
+ LOG.info("Scheduled policy-generator with interval: {}",
+ DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
+ }
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
new file mode 100644
index 0000000000..c6d6558dbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * This interface defines the plug-able policy that the PolicyGenerator uses
+ * to update policies into the state store.
+ */
+
+public abstract class GlobalPolicy implements Configurable {
+
+ private Configuration conf;
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Return a map of the object type and RM path to request it from - the
+ * framework will query these paths and provide the objects to the policy.
+ * Delegating this responsibility to the PolicyGenerator enables us to avoid
+ * duplicate calls to the same * endpoints as the GlobalPolicy is invoked
+ * once per queue.
+ *
+ * @return a map of the object type and RM path.
+ */
+ protected Map registerPaths() {
+ // Default register nothing
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Given a queue, cluster metrics, and policy manager, update the policy
+ * to account for the cluster status. This method defines the policy generator
+ * behavior.
+ *
+ * @param queueName name of the queue
+ * @param clusterInfo subClusterId map to cluster information about the
+ * SubCluster used to make policy decisions
+ * @param manager the FederationPolicyManager for the queue's existing
+ * policy the manager may be null, in which case the policy
+ * will need to be created
+ * @return policy manager that handles the updated (or created) policy
+ */
+ protected abstract FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager manager);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
new file mode 100644
index 0000000000..c2d578f771
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/NoOpGlobalPolicy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+import java.util.Map;
+
+/**
+ * Default policy that does not update any policy configurations.
+ */
+public class NoOpGlobalPolicy extends GlobalPolicy{
+
+ @Override
+ public FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager manager) {
+ return null;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
new file mode 100644
index 0000000000..33501fb1e3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The PolicyGenerator runs periodically and updates the policy configuration
+ * for each queue into the FederationStateStore. The policy update behavior is
+ * defined by the GlobalPolicy instance that is used.
+ */
+
+public class PolicyGenerator implements Runnable, Configurable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PolicyGenerator.class);
+
+ private GPGContext gpgContext;
+ private Configuration conf;
+
+ // Information request map
+ private Map pathMap = new HashMap<>();
+
+ // Global policy instance
+ @VisibleForTesting
+ private GlobalPolicy policy;
+
+ /**
+ * The PolicyGenerator periodically reads SubCluster load and updates
+ * policies into the FederationStateStore.
+ *
+ * @param conf Configuration.
+ * @param context GPG Context.
+ */
+ public PolicyGenerator(Configuration conf, GPGContext context) {
+ setConf(conf);
+ init(context);
+ }
+
+ private void init(GPGContext context) {
+ this.gpgContext = context;
+ LOG.info("Initialized PolicyGenerator");
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.policy = FederationStateStoreFacade.createInstance(conf,
+ YarnConfiguration.GPG_GLOBAL_POLICY_CLASS,
+ YarnConfiguration.DEFAULT_GPG_GLOBAL_POLICY_CLASS, GlobalPolicy.class);
+ policy.setConf(conf);
+ pathMap.putAll(policy.registerPaths());
+ }
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public final void run() {
+ Map activeSubClusters;
+ try {
+ activeSubClusters = gpgContext.getStateStoreFacade().getSubClusters(true);
+ } catch (YarnException e) {
+ LOG.error("Error retrieving active sub-clusters", e);
+ return;
+ }
+
+ // Parse the scheduler information from all the SCs
+ Map schedInfo = getSchedulerInfo(activeSubClusters);
+
+ // Extract and enforce that all the schedulers have matching type
+ Set queueNames = extractQueues(schedInfo);
+
+ // Remove black listed SubClusters
+ activeSubClusters.keySet().removeAll(getBlackList());
+ LOG.info("Active non-blacklist sub-clusters: {}",
+ activeSubClusters.keySet());
+
+ // Get cluster metrics information from non-black listed RMs - later used
+ // to evaluate SubCluster load
+ Map> clusterInfo =
+ getInfos(activeSubClusters);
+
+ // Update into the FederationStateStore
+ for (String queueName : queueNames) {
+ // Retrieve the manager from the policy facade
+ FederationPolicyManager manager;
+ try {
+ manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
+ } catch (YarnException e) {
+ LOG.error("GetPolicy for queue {} failed", queueName, e);
+ continue;
+ }
+ LOG.info("Updating policy for queue {}", queueName);
+ manager = policy.updatePolicy(queueName, clusterInfo, manager);
+ try {
+ this.gpgContext.getPolicyFacade().setPolicyManager(manager);
+ } catch (YarnException e) {
+ LOG.error("SetPolicy for queue {} failed", queueName, e);
+ }
+ }
+ }
+
+ /**
+ * Helper to retrieve metrics from the RM REST endpoints.
+ *
+ * @param activeSubClusters A map of active SubCluster IDs to info
+ * @return Mapping relationship between SubClusterId and Metric.
+ */
+ @VisibleForTesting
+ protected Map> getInfos(
+ Map activeSubClusters) {
+
+ Map> clusterInfo = new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ for (Map.Entry e : this.pathMap.entrySet()) {
+ if (!clusterInfo.containsKey(sci.getSubClusterId())) {
+ clusterInfo.put(sci.getSubClusterId(), new HashMap<>());
+ }
+ Object ret = GPGUtils.invokeRMWebService(sci.getRMWebServiceAddress(),
+ e.getValue(), e.getKey());
+ clusterInfo.get(sci.getSubClusterId()).put(e.getKey(), ret);
+ }
+ }
+
+ return clusterInfo;
+ }
+
+ /**
+ * Helper to retrieve SchedulerInfos.
+ *
+ * @param activeSubClusters A map of active SubCluster IDs to info
+ * @return Mapping relationship between SubClusterId and SubClusterInfo.
+ */
+ @VisibleForTesting
+ protected Map getSchedulerInfo(
+ Map activeSubClusters) {
+ Map schedInfo =
+ new HashMap<>();
+ for (SubClusterInfo sci : activeSubClusters.values()) {
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(sci.getRMWebServiceAddress(),
+ RMWSConsts.SCHEDULER, SchedulerTypeInfo.class);
+ if(sti != null){
+ schedInfo.put(sci.getSubClusterId(), sti.getSchedulerInfo());
+ } else {
+ LOG.warn("Skipped null scheduler info from SubCluster " + sci
+ .getSubClusterId().toString());
+ }
+ }
+ return schedInfo;
+ }
+
+ /**
+ * Helper to get a set of blacklisted SubCluster Ids from configuration.
+ */
+ private Set getBlackList() {
+ String blackListParam =
+ conf.get(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST);
+ if(blackListParam == null){
+ return Collections.emptySet();
+ }
+ Set blackList = new HashSet<>();
+ for (String id : blackListParam.split(",")) {
+ blackList.add(SubClusterId.newInstance(id));
+ }
+ return blackList;
+ }
+
+ /**
+ * Given the scheduler information for all RMs, extract the union of
+ * queue names - right now we only consider instances of capacity scheduler.
+ *
+ * @param schedInfo the scheduler information
+ * @return a set of queue names
+ */
+ private Set extractQueues(Map schedInfo) {
+ Set queueNames = new HashSet<>();
+ for (Map.Entry entry : schedInfo.entrySet()) {
+ if (entry.getValue() instanceof CapacitySchedulerInfo) {
+ // Flatten the queue structure and get only non leaf queues
+ queueNames.addAll(flattenQueue((CapacitySchedulerInfo) entry.getValue())
+ .get(CapacitySchedulerQueueInfo.class));
+ } else {
+ LOG.warn("Skipping SubCluster {}, not configured with capacity scheduler.",
+ entry.getKey());
+ }
+ }
+ return queueNames;
+ }
+
+ // Helpers to flatten the queue structure into a multimap of
+ // queue type to set of queue names
+ private Map> flattenQueue(CapacitySchedulerInfo csi) {
+ Map> flattened = new HashMap<>();
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ return flattened;
+ }
+
+ private void flattenQueue(CapacitySchedulerQueueInfo csi,
+ Map> flattened) {
+ addOrAppend(flattened, csi.getClass(), csi.getQueueName());
+ if (csi.getQueues() != null) {
+ for (CapacitySchedulerQueueInfo csqi : csi.getQueues().getQueueInfoList()) {
+ flattenQueue(csqi, flattened);
+ }
+ }
+ }
+
+ private void addOrAppend(Map> multimap, K key, V value) {
+ if (!multimap.containsKey(key)) {
+ multimap.put(key, new HashSet<>());
+ }
+ multimap.get(key).add(value);
+ }
+
+ public GlobalPolicy getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(GlobalPolicy policy) {
+ this.policy = policy;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
new file mode 100644
index 0000000000..23e99062c7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/UniformWeightedLocalityGlobalPolicy.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Simple policy that generates and updates uniform weighted locality
+ * policies.
+ */
+public class UniformWeightedLocalityGlobalPolicy extends GlobalPolicy {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UniformWeightedLocalityGlobalPolicy.class);
+
+ @Override
+ protected FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo, FederationPolicyManager currentManager){
+
+ if(currentManager == null){
+ // Set uniform weights for all SubClusters
+ LOG.info("Creating uniform weighted policy queue {}.", queueName);
+ WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+ manager.setQueue(queueName);
+ Map policyWeights =
+ GPGUtils.createUniformWeights(clusterInfo.keySet());
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(policyWeights);
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(policyWeights);
+ currentManager = manager;
+ }
+
+ if(currentManager instanceof WeightedLocalityPolicyManager){
+ LOG.info("Updating policy for queue {} to default weights.", queueName);
+ WeightedLocalityPolicyManager wlpmanager = (WeightedLocalityPolicyManager) currentManager;
+ Map uniformWeights =
+ GPGUtils.createUniformWeights(clusterInfo.keySet());
+ wlpmanager.getWeightedPolicyInfo().setAMRMPolicyWeights(uniformWeights);
+ wlpmanager.getWeightedPolicyInfo().setRouterPolicyWeights(uniformWeights);
+ } else {
+ LOG.info("Policy for queue {} is of type {}, expected {}",
+ queueName, currentManager.getClass(), WeightedLocalityPolicyManager.class);
+ }
+ return currentManager;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
new file mode 100644
index 0000000000..e8ff436ad3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes comprising the policy generator for the GPG. Responsibilities include
+ * generating and updating policies based on the cluster status.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
new file mode 100644
index 0000000000..d78c11fa1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/TestGPGPolicyFacade.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Unit test for GPG Policy Facade.
+ */
+public class TestGPGPolicyFacade {
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+ private GPGPolicyFacade policyFacade;
+
+ private Set subClusterIds;
+
+ private SubClusterPolicyConfiguration testConf;
+
+ private static final String TEST_QUEUE = "test-queue";
+
+ public TestGPGPolicyFacade() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+ subClusterIds = new HashSet<>();
+ subClusterIds.add(SubClusterId.newInstance("sc0"));
+ subClusterIds.add(SubClusterId.newInstance("sc1"));
+ subClusterIds.add(SubClusterId.newInstance("sc2"));
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException {
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(conf);
+ facade.reinitialize(stateStore, conf);
+ policyFacade = new GPGPolicyFacade(facade, conf);
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ testConf = manager.serializeConf();
+ stateStore.setPolicyConfiguration(SetSubClusterPolicyConfigurationRequest
+ .newInstance(testConf));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ @Test
+ public void testGetPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(testConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that new policies are written into the state store.
+ */
+ @Test
+ public void testSetNewPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE + 0);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE + 0);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that overwriting policies are updated in the state store.
+ */
+ @Test
+ public void testOverwritePolicy() throws YarnException {
+ subClusterIds.add(SubClusterId.newInstance("sc3"));
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ SubClusterPolicyConfiguration policyConf = manager.serializeConf();
+ policyFacade.setPolicyManager(manager);
+
+ manager =
+ (WeightedLocalityPolicyManager) policyFacade
+ .getPolicyManager(TEST_QUEUE);
+ Assert.assertEquals(policyConf, manager.serializeConf());
+ }
+
+ /**
+ * Test that the write through cache works.
+ */
+ @Test
+ public void testWriteCache() throws YarnException {
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // Query once to fill the cache
+ FederationPolicyManager manager = policyFacade.getPolicyManager(TEST_QUEUE);
+ // State store should be contacted once
+ verify(stateStore, times(1)).getPolicyConfiguration(
+ Matchers.any(GetSubClusterPolicyConfigurationRequest.class));
+
+ // If we set the same policy, the state store should be untouched
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+ /**
+ * Test that when read only is enabled, the state store is not changed.
+ */
+ @Test
+ public void testReadOnly() throws YarnException {
+ conf.setBoolean(YarnConfiguration.GPG_POLICY_GENERATOR_READONLY, true);
+ stateStore = mock(MemoryFederationStateStore.class);
+ facade.reinitialize(stateStore, conf);
+ when(stateStore.getPolicyConfiguration(Matchers.any(
+ GetSubClusterPolicyConfigurationRequest.class))).thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+ policyFacade = new GPGPolicyFacade(facade, conf);
+
+ // If we set a policy, the state store should be untouched
+ WeightedLocalityPolicyManager manager =
+ new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue(TEST_QUEUE);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(
+ GPGUtils.createUniformWeights(subClusterIds));
+ policyFacade.setPolicyManager(manager);
+ verify(stateStore, times(0)).setPolicyConfiguration(
+ Matchers.any(SetSubClusterPolicyConfigurationRequest.class));
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
new file mode 100644
index 0000000000..0fe475e3fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestPolicyGenerator.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGPolicyFacade;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for GPG Policy Generator.
+ */
+public class TestPolicyGenerator {
+
+ private static final int NUM_SC = 3;
+
+ private Configuration conf;
+ private FederationStateStore stateStore;
+ private FederationStateStoreFacade facade =
+ FederationStateStoreFacade.getInstance();
+
+ private List subClusterIds;
+ private Map subClusterInfos;
+ private Map> clusterInfos;
+ private Map schedulerInfos;
+
+ private GPGContext gpgContext;
+
+ private PolicyGenerator policyGenerator;
+
+ public TestPolicyGenerator() {
+ conf = new Configuration();
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ gpgContext = new GPGContextImpl();
+ gpgContext.setPolicyFacade(new GPGPolicyFacade(facade, conf));
+ gpgContext.setStateStoreFacade(facade);
+ }
+
+ @Before
+ public void setUp() throws IOException, YarnException, JAXBException {
+ subClusterIds = new ArrayList<>();
+ subClusterInfos = new HashMap<>();
+ clusterInfos = new HashMap<>();
+ schedulerInfos = new HashMap<>();
+
+ CapacitySchedulerInfo sti1 =
+ readJSON("src/test/resources/schedulerInfo1.json",
+ CapacitySchedulerInfo.class);
+ CapacitySchedulerInfo sti2 =
+ readJSON("src/test/resources/schedulerInfo2.json",
+ CapacitySchedulerInfo.class);
+
+ // Set up sub clusters
+ for (int i = 0; i < NUM_SC; ++i) {
+ // Sub cluster Id
+ SubClusterId id = SubClusterId.newInstance("sc" + i);
+ subClusterIds.add(id);
+
+ // Sub cluster info
+ SubClusterInfo cluster = SubClusterInfo
+ .newInstance(id, "amrm:" + i, "clientrm:" + i, "rmadmin:" + i,
+ "rmweb:" + i, SubClusterState.SC_RUNNING, 0, "");
+ subClusterInfos.put(id, cluster);
+
+ // Cluster metrics info
+ ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+ metricsInfo.setAppsPending(2000);
+ if (!clusterInfos.containsKey(id)) {
+ clusterInfos.put(id, new HashMap());
+ }
+ clusterInfos.get(id).put(ClusterMetricsInfo.class, metricsInfo);
+
+ schedulerInfos.put(id, sti1);
+ }
+
+ // Change one of the sub cluster schedulers
+ schedulerInfos.put(subClusterIds.get(0), sti2);
+
+ stateStore = mock(FederationStateStore.class);
+ when(stateStore.getSubClusters(any()))
+ .thenReturn(GetSubClustersInfoResponse.newInstance(
+ new ArrayList<>(subClusterInfos.values())));
+ facade.reinitialize(stateStore, conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stateStore.close();
+ stateStore = null;
+ }
+
+ private T readJSON(String pathname, Class classy)
+ throws IOException, JAXBException {
+
+ JSONJAXBContext jc =
+ new JSONJAXBContext(JSONConfiguration.mapped().build(), classy);
+ JSONUnmarshaller unmarshaller = jc.createJSONUnmarshaller();
+ String contents = new String(Files.readAllBytes(Paths.get(pathname)));
+ return unmarshaller.unmarshalFromJSON(new StringReader(contents), classy);
+
+ }
+
+ @Test
+ public void testPolicyGenerator() throws YarnException {
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.setPolicy(mock(GlobalPolicy.class));
+ policyGenerator.run();
+ verify(policyGenerator.getPolicy(), times(1))
+ .updatePolicy("default", clusterInfos, null);
+ verify(policyGenerator.getPolicy(), times(1))
+ .updatePolicy("default2", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklist() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString());
+ Map> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.setPolicy(mock(GlobalPolicy.class));
+ policyGenerator.run();
+ verify(policyGenerator.getPolicy(), times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.getPolicy(), times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testBlacklistTwo() throws YarnException {
+ conf.set(YarnConfiguration.GPG_POLICY_GENERATOR_BLACKLIST,
+ subClusterIds.get(0).toString() + "," + subClusterIds.get(1)
+ .toString());
+ Map> blacklistedCMI =
+ new HashMap<>(clusterInfos);
+ blacklistedCMI.remove(subClusterIds.get(0));
+ blacklistedCMI.remove(subClusterIds.get(1));
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.setPolicy(mock(GlobalPolicy.class));
+ policyGenerator.run();
+ verify(policyGenerator.getPolicy(), times(1))
+ .updatePolicy("default", blacklistedCMI, null);
+ verify(policyGenerator.getPolicy(), times(0))
+ .updatePolicy("default", clusterInfos, null);
+ }
+
+ @Test
+ public void testExistingPolicy() throws YarnException {
+ WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+ // Add a test policy for test queue
+ manager.setQueue("default");
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet<>(subClusterIds)));
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(GPGUtils
+ .createUniformWeights(new HashSet<>(subClusterIds)));
+ SubClusterPolicyConfiguration testConf = manager.serializeConf();
+ when(stateStore.getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest.newInstance("default")))
+ .thenReturn(
+ GetSubClusterPolicyConfigurationResponse.newInstance(testConf));
+
+ policyGenerator = new TestablePolicyGenerator();
+ policyGenerator.setPolicy(mock(GlobalPolicy.class));
+ policyGenerator.run();
+
+ ArgumentCaptor argCaptor =
+ ArgumentCaptor.forClass(FederationPolicyManager.class);
+ verify(policyGenerator.getPolicy(), times(1))
+ .updatePolicy(eq("default"), eq(clusterInfos), argCaptor.capture());
+ assertEquals(argCaptor.getValue().getClass(), manager.getClass());
+ assertEquals(argCaptor.getValue().serializeConf(), manager.serializeConf());
+ }
+
+ @Test
+ public void testCallRM() {
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+
+ final String a = CapacitySchedulerConfiguration.ROOT + ".a";
+ final String b = CapacitySchedulerConfiguration.ROOT + ".b";
+ final String a1 = a + ".a1";
+ final String a2 = a + ".a2";
+ final String b1 = b + ".b1";
+ final String b2 = b + ".b2";
+ final String b3 = b + ".b3";
+ float aCapacity = 10.5f;
+ float bCapacity = 89.5f;
+ float a1Capacity = 30;
+ float a2Capacity = 70;
+ float b1Capacity = 79.2f;
+ float b2Capacity = 0.8f;
+ float b3Capacity = 20;
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+
+ csConf.setCapacity(a, aCapacity);
+ csConf.setCapacity(b, bCapacity);
+
+ // Define 2nd-level queues
+ csConf.setQueues(a, new String[] {"a1", "a2"});
+ csConf.setCapacity(a1, a1Capacity);
+ csConf.setUserLimitFactor(a1, 100.0f);
+ csConf.setCapacity(a2, a2Capacity);
+ csConf.setUserLimitFactor(a2, 100.0f);
+
+ csConf.setQueues(b, new String[] {"b1", "b2", "b3"});
+ csConf.setCapacity(b1, b1Capacity);
+ csConf.setUserLimitFactor(b1, 100.0f);
+ csConf.setCapacity(b2, b2Capacity);
+ csConf.setUserLimitFactor(b2, 100.0f);
+ csConf.setCapacity(b3, b3Capacity);
+ csConf.setUserLimitFactor(b3, 100.0f);
+
+ YarnConfiguration rmConf = new YarnConfiguration(csConf);
+
+ ResourceManager resourceManager = new ResourceManager();
+ rmConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ resourceManager.init(rmConf);
+ resourceManager.start();
+
+ String rmAddress = WebAppUtils.getRMWebAppURLWithScheme(this.conf);
+ SchedulerTypeInfo sti = GPGUtils
+ .invokeRMWebService(rmAddress, RMWSConsts.SCHEDULER,
+ SchedulerTypeInfo.class);
+
+ Assert.assertNotNull(sti);
+ }
+
+ /**
+ * Testable policy generator overrides the methods that communicate
+ * with the RM REST endpoint, allowing us to inject faked responses.
+ */
+ class TestablePolicyGenerator extends PolicyGenerator {
+
+ TestablePolicyGenerator() {
+ super(conf, gpgContext);
+ }
+
+ @Override
+ protected Map> getInfos(
+ Map activeSubClusters) {
+ Map> ret = new HashMap<>();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ if (!ret.containsKey(id)) {
+ ret.put(id, new HashMap<>());
+ }
+ ret.get(id).put(ClusterMetricsInfo.class,
+ clusterInfos.get(id).get(ClusterMetricsInfo.class));
+ }
+ return ret;
+ }
+
+ @Override
+ protected Map getSchedulerInfo(
+ Map activeSubClusters) {
+ Map ret = new HashMap<>();
+ for (SubClusterId id : activeSubClusters.keySet()) {
+ ret.put(id, schedulerInfos.get(id));
+ }
+ return ret;
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
new file mode 100644
index 0000000000..3ad45945f9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo1.json
@@ -0,0 +1,134 @@
+{
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "queueName": "root",
+ "queues": {
+ "queue": [
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ }
+ ]
+ },
+ "health": {
+ "lastrun": 1517951638085,
+ "operationsInfo": {
+ "entry": {
+ "key": "last-allocation",
+ "value": {
+ "nodeId": "node0:0",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-reservation",
+ "value": {
+ "nodeId": "node0:1",
+ "containerId": "container_e61477_1517879828320_0249_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-release",
+ "value": {
+ "nodeId": "node0:2",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-preemption",
+ "value": {
+ "nodeId": "N/A",
+ "containerId": "N/A",
+ "queue": "N/A"
+ }
+ }
+ },
+ "lastRunDetails": [
+ {
+ "operation": "releases",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "allocations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "reservations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json
new file mode 100644
index 0000000000..2ff879e869
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/resources/schedulerInfo2.json
@@ -0,0 +1,196 @@
+ {
+ "type": "capacityScheduler",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "queueName": "root",
+ "queues": {
+ "queue": [
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ },
+ {
+ "type": "capacitySchedulerLeafQueueInfo",
+ "capacity": 100.0,
+ "usedCapacity": 0.0,
+ "maxCapacity": 100.0,
+ "absoluteCapacity": 100.0,
+ "absoluteMaxCapacity": 100.0,
+ "absoluteUsedCapacity": 0.0,
+ "numApplications": 484,
+ "queueName": "default2",
+ "state": "RUNNING",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "hideReservationQueues": false,
+ "nodeLabels": [
+ "*"
+ ],
+ "numActiveApplications": 484,
+ "numPendingApplications": 0,
+ "numContainers": 0,
+ "maxApplications": 10000,
+ "maxApplicationsPerUser": 10000,
+ "userLimit": 100,
+ "users": {
+ "user": [
+ {
+ "username": "Default",
+ "resourcesUsed": {
+ "memory": 0,
+ "vCores": 0
+ },
+ "numPendingApplications": 0,
+ "numActiveApplications": 468,
+ "AMResourceUsed": {
+ "memory": 30191616,
+ "vCores": 468
+ },
+ "userResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ }
+ }
+ ]
+ },
+ "userLimitFactor": 1.0,
+ "AMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "usedAMResource": {
+ "memory": 30388224,
+ "vCores": 532
+ },
+ "userAMResourceLimit": {
+ "memory": 31490048,
+ "vCores": 7612
+ },
+ "preemptionDisabled": true
+ }
+ ]
+ },
+ "health": {
+ "lastrun": 1517951638085,
+ "operationsInfo": {
+ "entry": {
+ "key": "last-allocation",
+ "value": {
+ "nodeId": "node0:0",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-reservation",
+ "value": {
+ "nodeId": "node0:1",
+ "containerId": "container_e61477_1517879828320_0249_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-release",
+ "value": {
+ "nodeId": "node0:2",
+ "containerId": "container_e61477_1517922128312_0340_01_000001",
+ "queue": "root.default"
+ }
+ },
+ "entry": {
+ "key": "last-preemption",
+ "value": {
+ "nodeId": "N/A",
+ "containerId": "N/A",
+ "queue": "N/A"
+ }
+ }
+ },
+ "lastRunDetails": [
+ {
+ "operation": "releases",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "allocations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ },
+ {
+ "operation": "reservations",
+ "count": 0,
+ "resources": {
+ "memory": 0,
+ "vCores": 0
+ }
+ }
+ ]
+ }
+ }