diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index cf457c23eb..309c028580 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -209,6 +209,10 @@
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index faa5c82d7e..ae7ea196d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -4427,6 +4427,24 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String GPG_POLICY_GENERATOR_BLACKLIST =
FEDERATION_GPG_POLICY_PREFIX + "blacklist";
+ private static final String FEDERATION_GPG_LOAD_BASED_PREFIX =
+ YarnConfiguration.FEDERATION_GPG_PREFIX + "policy.generator.load-based.";
+ public static final String FEDERATION_GPG_LOAD_BASED_MIN_PENDING =
+ FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.minimum";
+ public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING = 100;
+ public static final String FEDERATION_GPG_LOAD_BASED_MAX_PENDING =
+ FEDERATION_GPG_LOAD_BASED_PREFIX + "pending.maximum";
+ public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING = 1000;
+ public static final String FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT =
+ FEDERATION_GPG_LOAD_BASED_PREFIX + "weight.minimum";
+ public static final float DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT = 0.0f;
+ public static final String FEDERATION_GPG_LOAD_BASED_MAX_EDIT =
+ FEDERATION_GPG_LOAD_BASED_PREFIX + "edit.maximum";
+ public static final int DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT = 3;
+ public static final String FEDERATION_GPG_LOAD_BASED_SCALING =
+ FEDERATION_GPG_LOAD_BASED_PREFIX + "scaling";
+ public static final String DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING = "LINEAR";
+
/**
* Connection and Read timeout from the Router to RM.
*/
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b643bd8d08..6cab018e5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -5468,4 +5468,66 @@
+
+
+ GPG load policy, the minimum number of pending applications in the subCluster.
+
+ yarn.federation.gpg.policy.generator.load-based.pending.minimum
+ 100
+
+
+
+
+ GPG load policy, the maximum number of pending applications in the subCluster.
+
+ yarn.federation.gpg.policy.generator.load-based.pending.maximum
+ 1000
+
+
+
+
+ GPG load policy, the subCluster minimum weight,
+ If a subCluster has a very high load, we will assign this value to the subCluster.
+ The default value is 0, which means that we no longer assign appliaction to this subCluster.
+
+ yarn.federation.gpg.policy.generator.load-based.weight.minimum
+ 0
+
+
+
+
+ GPG load policy, We choose the subCluster computing load of TopN.
+ This value represents the number of subClusters we want to calculate.
+
+ yarn.federation.gpg.policy.generator.load-based.edit.maximum
+ 3
+
+
+
+
+ GPG load policy, We provide 4 calculation methods: NONE, LINEAR, QUADRATIC, LOG.
+
+ Note, this calculation method is when the number of Pending Applications in
+ the subCluster is less than yarn.federation.gpg.policy.generator.load-based.pending.maximum.
+
+ maxPendingVal = yarn.federation.gpg.policy.generator.load-based.pending.maximum -
+ yarn.federation.gpg.policy.generator.load-based.pending.minimum
+ curPendingVal = Pending Applications in the subCluster -
+ yarn.federation.gpg.policy.generator.load-based.pending.minimum
+
+ 1. NONE: No calculation is required, and the weight is 1 at this time.
+ 2. LINEAR: For linear computation, we will use (maxPendingVal - curPendingVal) / (maxPendingVal).
+ 3. QUADRATIC: Calculated using quadratic,
+ We will calculate quadratic for maxPendingVal, curPendingVal,
+ then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+ 4. LOG(LOGARITHM): Calculated using logarithm,
+ We will calculate logarithm for maxPendingVal, curPendingVal,
+ then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+
+ LINEAR is used by default.
+
+ yarn.federation.gpg.policy.generator.load-based.scaling
+ LINEAR
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
index 6d2e1d4142..2bb56caeff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/GPGUtils.java
@@ -55,7 +55,7 @@ private GPGUtils() {
*/
public static T invokeRMWebService(String webAddr, String path, final Class returnType) {
Client client = Client.create();
- T obj = null;
+ T obj;
WebResource webResource = client.resource(webAddr);
ClientResponse response = null;
@@ -86,8 +86,7 @@ public static T invokeRMWebService(String webAddr, String path, final Class<
*/
public static Map createUniformWeights(
Set ids) {
- Map weights =
- new HashMap<>();
+ Map weights = new HashMap<>();
for(SubClusterId id : ids) {
weights.put(new SubClusterIdInfo(id), 1.0f);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
index c6d6558dbe..ab60a48434 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/GlobalPolicy.java
@@ -53,7 +53,7 @@ public Configuration getConf() {
*
* @return a map of the object type and RM path.
*/
- protected Map registerPaths() {
+ protected Map, String> registerPaths() {
// Default register nothing
return Collections.emptyMap();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java
new file mode 100644
index 0000000000..f728b92d71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/LoadBasedGlobalPolicy.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
+import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_SCALING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING;
+
+/**
+ * Load based policy that generates weighted policies by scaling
+ * the cluster load (based on pending) to a weight from 0.0 to 1.0.
+ */
+public class LoadBasedGlobalPolicy extends GlobalPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LoadBasedGlobalPolicy.class);
+
+ public enum Scaling {
+ LINEAR,
+ QUADRATIC,
+ LOG,
+ NONE
+ }
+
+ // Minimum pending count before the policy starts scaling down the weights
+ private int minPending;
+ // Maximum pending count before policy stops scaling down the weights
+ // (they'll be set to min weight)
+ private int maxPending;
+ // Minimum weight that a sub cluster will be assigned
+ private float minWeight;
+ // Maximum number of weights that can be scaled down simultaneously
+ private int maxEdit;
+ // Scaling type
+ private Scaling scaling = Scaling.NONE;
+
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ minPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING,
+ DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_PENDING);
+ maxPending = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING,
+ DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_PENDING);
+ minWeight = conf.getFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT,
+ DEFAULT_FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT);
+ maxEdit = conf.getInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT,
+ DEFAULT_FEDERATION_GPG_LOAD_BASED_MAX_EDIT);
+
+ try {
+ scaling = Scaling.valueOf(conf.get(FEDERATION_GPG_LOAD_BASED_SCALING,
+ DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING));
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Invalid scaling mode provided", e);
+ }
+
+ // Check that all configuration values are valid
+ if (!(minPending <= maxPending)) {
+ throw new YarnRuntimeException("minPending = " + minPending
+ + " must be less than or equal to maxPending=" + maxPending);
+ }
+ if (!(minWeight >= 0 && minWeight < 1)) {
+ throw new YarnRuntimeException(
+ "minWeight = " + minWeight + " must be within range [0,1)");
+ }
+ }
+
+ @Override
+ protected Map, String> registerPaths() {
+ // Register for the endpoints we want to receive information on
+ Map, String> map = new HashMap<>();
+ map.put(ClusterMetricsInfo.class, RMWSConsts.METRICS);
+ return map;
+ }
+
+ /**
+ * Update the policy of the queue.
+ *
+ * @param queueName name of the queue
+ * @param clusterInfo subClusterId map to cluster information about the
+ * SubCluster used to make policy decisions
+ * @param currentManager the FederationPolicyManager for the queue's existing
+ * policy the manager may be null, in which case the policy
+ * will need to be created.
+ *
+ * @return FederationPolicyManager.
+ */
+ @Override
+ protected FederationPolicyManager updatePolicy(String queueName,
+ Map> clusterInfo,
+ FederationPolicyManager currentManager) {
+ if (currentManager == null) {
+ LOG.info("Creating load based weighted policy queue {}.", queueName);
+ currentManager = getWeightedLocalityPolicyManager(queueName, clusterInfo);
+ } else if (currentManager instanceof WeightedLocalityPolicyManager) {
+ LOG.info("Updating load based weighted policy queue {}.", queueName);
+ currentManager = getWeightedLocalityPolicyManager(queueName, clusterInfo);
+ } else {
+ LOG.warn("Policy for queue {} is of type {}, expected {}.", queueName,
+ currentManager.getClass(), WeightedLocalityPolicyManager.class);
+ }
+ return currentManager;
+ }
+
+ /**
+ * GPG can help update the policy of the queue.
+ *
+ * We automatically generate the weight of the subCluster
+ * according to the clusterMetrics of the subCluster.
+ *
+ * @param queue queueName.
+ * @param subClusterMetricInfos Metric information of the subCluster.
+ * @return WeightedLocalityPolicyManager.
+ */
+ protected WeightedLocalityPolicyManager getWeightedLocalityPolicyManager(String queue,
+ Map> subClusterMetricInfos) {
+
+ // Parse the metric information of the subCluster.
+ Map clusterMetrics =
+ getSubClustersMetricsInfo(subClusterMetricInfos);
+
+ if (MapUtils.isEmpty(clusterMetrics)) {
+ return null;
+ }
+
+ // Get the new weight of the subCluster.
+ WeightedLocalityPolicyManager manager = new WeightedLocalityPolicyManager();
+ Map weights = getTargetWeights(clusterMetrics);
+ manager.setQueue(queue);
+ manager.getWeightedPolicyInfo().setAMRMPolicyWeights(weights);
+ manager.getWeightedPolicyInfo().setRouterPolicyWeights(weights);
+ return manager;
+ }
+
+ /**
+ * Get the ClusterMetric information of the subCluster.
+ *
+ * @param subClusterMetricsInfo subCluster Metric Information.
+ * @return Mapping relationship between subCluster and Metric.
+ */
+ protected Map getSubClustersMetricsInfo(
+ Map> subClusterMetricsInfo) {
+
+ // Check whether the Metric information of the sub-cluster is empty,
+ // if it is empty, we will directly return null.
+ if(MapUtils.isEmpty(subClusterMetricsInfo)) {
+ LOG.warn("The metric info of the subCluster is empty.");
+ return null;
+ }
+
+ Map clusterMetrics = new HashMap<>();
+ for (Map.Entry> entry : subClusterMetricsInfo.entrySet()) {
+ SubClusterId subClusterId = entry.getKey();
+ Map subClusterMetrics = entry.getValue();
+ ClusterMetricsInfo clusterMetricsInfo = (ClusterMetricsInfo)
+ subClusterMetrics.getOrDefault(ClusterMetricsInfo.class, null);
+ clusterMetrics.put(subClusterId, clusterMetricsInfo);
+ }
+
+ // return subCluster Metric Information.
+ return clusterMetrics;
+ }
+
+ /**
+ * Get subCluster target weight.
+ *
+ * @param clusterMetrics Metric of the subCluster.
+ * @return subCluster Weights.
+ */
+ @VisibleForTesting
+ protected Map getTargetWeights(
+ Map clusterMetrics) {
+ Map weights = GPGUtils.createUniformWeights(clusterMetrics.keySet());
+
+ List scs = new ArrayList<>(clusterMetrics.keySet());
+ // Sort the sub clusters into descending order based on pending load
+ scs.sort(new SortByDescendingLoad(clusterMetrics));
+
+ // Keep the top N loaded sub clusters
+ scs = scs.subList(0, Math.min(maxEdit, scs.size()));
+
+ for (SubClusterId sc : scs) {
+ LOG.info("Updating weight for sub cluster {}", sc.toString());
+ int pending = clusterMetrics.get(sc).getAppsPending();
+ if (pending <= minPending) {
+ LOG.info("Load ({}) is lower than minimum ({}), skipping", pending, minPending);
+ } else if (pending < maxPending) {
+ // The different scaling strategies should all map values from the
+ // range min_pending+1 to max_pending to the range min_weight to 1.0f
+ // so we pre-process and simplify the domain to some value [1, MAX-MIN)
+ int val = pending - minPending;
+ int maxVal = maxPending - minPending;
+
+ // Scale the weights to respect the config minimum
+ float weight = getWeightByScaling(maxVal, val);
+ weight = weight * (1.0f - minWeight);
+ weight += minWeight;
+ weights.put(new SubClusterIdInfo(sc), weight);
+ LOG.info("Load ({}) is within maximum ({}), setting weights via {} "
+ + "scale to {}", pending, maxPending, scaling, weight);
+ } else {
+ weights.put(new SubClusterIdInfo(sc), minWeight);
+ LOG.info("Load ({}) exceeded maximum ({}), setting weight to minimum: {}",
+ pending, maxPending, minWeight);
+ }
+ }
+ validateWeights(weights);
+ return weights;
+ }
+
+ /**
+ * Get weight information.
+ * We will calculate the weight information according to different Scaling.
+ *
+ * NONE: No calculation is required, and the weight is 1 at this time.
+ *
+ * LINEAR: For linear computation, we will use (maxPendingVal - curPendingVal) / (maxPendingVal).
+ *
+ * QUADRATIC: Calculated using quadratic,
+ * We will calculate quadratic for maxPendingVal, curPendingVal,
+ * then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+ *
+ * LOG(LOGARITHM): Calculated using logarithm,
+ * We will calculate logarithm for maxPendingVal, curPendingVal,
+ * then use this formula = (maxPendingVal - curPendingVal) / (maxPendingVal).
+ *
+ * @param maxPendingVal maxPending - minPending
+ * @param curPendingVal pending - minPending
+ * @return Calculated weight information.
+ */
+ protected float getWeightByScaling(int maxPendingVal, int curPendingVal) {
+ float weight = 1.0f;
+ switch (scaling) {
+ case NONE:
+ break;
+ case LINEAR:
+ weight = (float) (maxPendingVal - curPendingVal) / (float) (maxPendingVal);
+ break;
+ case QUADRATIC:
+ double maxValQuad = Math.pow(maxPendingVal, 2);
+ double valQuad = Math.pow(curPendingVal, 2);
+ weight = (float) (maxValQuad - valQuad) / (float) (maxValQuad);
+ break;
+ case LOG:
+ double maxValLog = Math.log(maxPendingVal);
+ double valLog = Math.log(curPendingVal);
+ weight = (float) (maxValLog - valLog) / (float) (maxValLog);
+ break;
+ default:
+ LOG.warn("No suitable scaling found, Skip.");
+ break;
+ }
+ return weight;
+ }
+
+ /**
+ * Helper to avoid all zero weights. If weights are all zero, they're reset
+ * to one
+ * @param weights weights to validate
+ */
+ private void validateWeights(Map weights) {
+ for(Float w : weights.values()) {
+ // If we find a nonzero weight, we're validated
+ if(w > 0.0f) {
+ return;
+ }
+ }
+ LOG.warn("All {} generated weights were 0.0f. Resetting to 1.0f.", weights.size());
+ // All weights were zero. Reset all back to 1.0
+ weights.replaceAll((i, v) -> 1.0f);
+ }
+
+ private static final class SortByDescendingLoad
+ implements Comparator {
+
+ private Map clusterMetrics;
+
+ private SortByDescendingLoad(
+ Map clusterMetrics) {
+ this.clusterMetrics = clusterMetrics;
+ }
+
+ public int compare(SubClusterId a, SubClusterId b) {
+ // Sort by pending load
+ return clusterMetrics.get(b).getAppsPending() - clusterMetrics.get(a)
+ .getAppsPending();
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
index 33501fb1e3..3c94d6576e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/main/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/PolicyGenerator.java
@@ -129,15 +129,15 @@ public final void run() {
try {
manager = this.gpgContext.getPolicyFacade().getPolicyManager(queueName);
} catch (YarnException e) {
- LOG.error("GetPolicy for queue {} failed", queueName, e);
+ LOG.error("GetPolicy for queue {} failed.", queueName, e);
continue;
}
- LOG.info("Updating policy for queue {}", queueName);
+ LOG.info("Updating policy for queue {}.", queueName);
manager = policy.updatePolicy(queueName, clusterInfo, manager);
try {
this.gpgContext.getPolicyFacade().setPolicyManager(manager);
} catch (YarnException e) {
- LOG.error("SetPolicy for queue {} failed", queueName, e);
+ LOG.error("SetPolicy for queue {} failed.", queueName, e);
}
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java
new file mode 100644
index 0000000000..df58b30aaa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-globalpolicygenerator/src/test/java/org/apache/hadoop/yarn/server/globalpolicygenerator/policygenerator/TestLoadBasedGlobalPolicy.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_EDIT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MAX_PENDING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_GPG_LOAD_BASED_SCALING;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the Load Based Global Policy.
+ */
+public class TestLoadBasedGlobalPolicy {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestLoadBasedGlobalPolicy.class);
+
+ private static final int NUM_SC = 3;
+ private static final float DELTA = 0.00001f;
+
+ private static final int MIN_PENDING = 100;
+ private static final int MAX_PENDING = 500;
+
+ private List subClusterIds;
+ private Map clusterMetricsInfos;
+ private Map weights;
+
+ private final Configuration conf;
+ private final LoadBasedGlobalPolicy policyGenerator;
+
+ public TestLoadBasedGlobalPolicy() {
+ conf = new Configuration();
+ policyGenerator = new LoadBasedGlobalPolicy();
+ }
+
+ @Before
+ public void setUp() {
+
+ conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 2);
+ conf.setInt(FEDERATION_GPG_LOAD_BASED_MIN_PENDING, MIN_PENDING);
+ conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_PENDING, MAX_PENDING);
+ conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.0f);
+ conf.set(FEDERATION_GPG_LOAD_BASED_SCALING, LoadBasedGlobalPolicy.Scaling.LINEAR.name());
+ policyGenerator.setConf(conf);
+
+ subClusterIds = new ArrayList<>();
+ clusterMetricsInfos = new HashMap<>();
+ // Set up sub clusters
+ for (int i = 0; i < NUM_SC; ++i) {
+ // subClusterId
+ SubClusterId id = SubClusterId.newInstance("sc" + i);
+ subClusterIds.add(id);
+
+ // Cluster metrics info
+ ClusterMetricsInfo metricsInfo = new ClusterMetricsInfo();
+ metricsInfo.setAppsPending(50);
+ clusterMetricsInfos.put(id, metricsInfo);
+ }
+ }
+
+ @Test
+ public void testSimpleTargetWeights() {
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(1.0, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ }
+
+ @Test
+ public void testLoadTargetWeights() {
+ getMetric(0).setAppsPending(100);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(1.0, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ getMetric(0).setAppsPending(500);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(0.0, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ }
+
+ @Test
+ public void testMaxEdit() {
+ // The policy should be able to edit 2 weights
+ getMetric(0).setAppsPending(MAX_PENDING + 200);
+ getMetric(1).setAppsPending(MAX_PENDING + 100);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(0.0, getWeight(0), DELTA);
+ assertEquals(0.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ // After updating the config, it should only edit the most loaded
+ conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 1);
+ policyGenerator.setConf(conf);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(0.0, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ }
+
+ @Test
+ public void testMinWeight() {
+ // If a minimum weight is set, the generator should not go below it
+ conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.5f);
+ policyGenerator.setConf(conf);
+ getMetric(0).setAppsPending(Integer.MAX_VALUE);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(0.5, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ }
+
+ @Test
+ public void testScaling() {
+ LOG.info("Testing that the generator weights are monotonically"
+ + " decreasing regardless of scaling method");
+ for (LoadBasedGlobalPolicy.Scaling scaling :
+ new LoadBasedGlobalPolicy.Scaling[] {LoadBasedGlobalPolicy.Scaling.LINEAR,
+ LoadBasedGlobalPolicy.Scaling.QUADRATIC, LoadBasedGlobalPolicy.Scaling.LOG }) {
+ LOG.info("Testing {} scaling...", scaling);
+ conf.set(DEFAULT_FEDERATION_GPG_LOAD_BASED_SCALING, scaling.name());
+ policyGenerator.setConf(conf);
+ // Test a continuous range for scaling
+ float prevWeight = 1.01f;
+ for (int load = 0; load < MAX_PENDING * 2; ++load) {
+ getMetric(0).setAppsPending(load);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ if (load < MIN_PENDING) {
+ // Below the minimum load, it should stay 1.0f
+ assertEquals(1.0f, getWeight(0), DELTA);
+ } else if (load < MAX_PENDING) {
+ // In the specified range, the weight should consistently decrease
+ float weight = getWeight(0);
+ assertTrue(weight < prevWeight);
+ prevWeight = weight;
+ } else {
+ // Above the maximum load, it should stay 0.0f
+ assertEquals(0.0f, getWeight(0), DELTA);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testNonZero() {
+ // If all generated weights are zero, they should be set back to one
+ conf.setFloat(FEDERATION_GPG_LOAD_BASED_MIN_WEIGHT, 0.0f);
+ conf.setInt(FEDERATION_GPG_LOAD_BASED_MAX_EDIT, 3);
+ policyGenerator.setConf(conf);
+ getMetric(0).setAppsPending(Integer.MAX_VALUE);
+ getMetric(1).setAppsPending(Integer.MAX_VALUE);
+ getMetric(2).setAppsPending(Integer.MAX_VALUE);
+ weights = policyGenerator.getTargetWeights(clusterMetricsInfos);
+ assertEquals(weights.size(), 3);
+ assertEquals(1.0, getWeight(0), DELTA);
+ assertEquals(1.0, getWeight(1), DELTA);
+ assertEquals(1.0, getWeight(2), DELTA);
+ }
+
+ private float getWeight(int sc) {
+ return weights.get(new SubClusterIdInfo(subClusterIds.get(sc)));
+ }
+
+ private ClusterMetricsInfo getMetric(int sc) {
+ return clusterMetricsInfos.get(subClusterIds.get(sc));
+ }
+}