diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java index 879ccee3de..9347fd0a0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java @@ -59,13 +59,12 @@ public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() { /** * Setter for the {@link SubClusterPolicyConfiguration}. * - * @param federationPolicyConfiguration the - * {@link SubClusterPolicyConfiguration} - * to be used for initialization. + * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} + * to be used for initialization. */ - public void setFederationPolicyConfiguration( - SubClusterPolicyConfiguration federationPolicyConfiguration) { - this.federationPolicyConfiguration = federationPolicyConfiguration; + public void setSubClusterPolicyConfiguration( + SubClusterPolicyConfiguration fedPolicyConfiguration) { + this.federationPolicyConfiguration = fedPolicyConfiguration; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java similarity index 68% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java index fdc3857880..e5dba638f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java @@ -18,21 +18,29 @@ package org.apache.hadoop.yarn.server.federation.policies; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; - - import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; - -import org.apache.hadoop.yarn.server.federation.policies.router - .FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; /** - * Implementors of this interface are capable to instantiate and (re)initalize - * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} based on - * a {@link FederationPolicyInitializationContext}. The reason to bind these two - * policies together is to make sure we remain consistent across the router and - * amrmproxy policy decisions. + * + * Implementors need to provide the ability to serliaze a policy and its + * configuration as a {@link SubClusterPolicyConfiguration}, as well as + * provide (re)initialization mechanics for the underlying + * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}. + * + * The serialization aspects are used by admin APIs or a policy engine to + * store a serialized configuration in the {@code FederationStateStore}, + * while the getters methods are used to obtain a propertly inizialized + * policy in the {@code Router} and {@code AMRMProxy} respectively. + * + * This interface by design binds together + * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and + * provide lifecycle support for serialization and deserialization, to reduce + * configuration mistakes (combining incompatible policies). + * */ -public interface FederationPolicyConfigurator { +public interface FederationPolicyManager { /** * If the current instance is compatible, this method returns the same @@ -88,4 +96,31 @@ FederationRouterPolicy getRouterPolicy( FederationRouterPolicy oldInstance) throws FederationPolicyInitializationException; + /** + * This method is invoked to derive a {@link SubClusterPolicyConfiguration}. + * This is to be used when writing a policy object in the federation policy + * store. + * + * @return a valid policy configuration representing this object + * parametrization. + * + * @throws FederationPolicyInitializationException if the current state cannot + * be serialized properly + */ + SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException; + + + /** + * This method returns the queue this policy is configured for. + * @return the name of the queue. + */ + String getQueue(); + + /** + * This methods provides a setter for the queue this policy is specified for. + * @param queue the name of the queue. + */ + void setQueue(String queue); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java deleted file mode 100644 index 5034b7e257..0000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.yarn.server.federation.policies; - -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -/** - * Implementors of this class are able to serializeConf the configuraiton of a - * policy as a {@link SubClusterPolicyConfiguration}. This is used during the - * lifetime of a policy from the admin APIs or policy engine to serializeConf - * the policy into the policy store. - */ -public interface FederationPolicyWriter { - - /** - /** - * This method is invoked to derive a {@link SubClusterPolicyConfiguration}. - * This is to be used when writing a policy object in the federation policy - * store. - * - * @return a valid policy configuration representing this object - * parametrization. - * - * @throws FederationPolicyInitializationException if the current state cannot - * be serialized properly - */ - SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException; -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java new file mode 100644 index 0000000000..a0fa37f221 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.dao; + +import com.sun.jersey.api.json.JSONConfiguration; +import com.sun.jersey.api.json.JSONJAXBContext; +import com.sun.jersey.api.json.JSONMarshaller; +import com.sun.jersey.api.json.JSONUnmarshaller; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.StringReader; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +/** + * This is a DAO class for the configuration of parameteres for federation + * policies. This generalizes several possible configurations as two lists of + * {@link SubClusterIdInfo} and corresponding weights as a + * {@link Float}. The interpretation of the weight is left to the logic in + * the policy. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name = "federation-policy") +@XmlAccessorType(XmlAccessType.FIELD) +public class WeightedPolicyInfo { + + private static final Logger LOG = + LoggerFactory.getLogger(WeightedPolicyInfo.class); + + private Map routerPolicyWeights = new HashMap<>(); + private Map amrmPolicyWeights = new HashMap<>(); + private float headroomAlpha; + + private static JSONJAXBContext jsonjaxbContext = initContext(); + + private static JSONJAXBContext initContext() { + try { + return new JSONJAXBContext(JSONConfiguration.DEFAULT, + WeightedPolicyInfo.class); + } catch (JAXBException e) { + LOG.error("Error parsing the policy.", e); + } + return null; + } + + public WeightedPolicyInfo() { + //JAXB needs this + } + + /** + * Setter method for Router weights. + * + * @param policyWeights the router weights. + */ + public void setRouterPolicyWeights( + Map policyWeights) { + this.routerPolicyWeights = policyWeights; + } + + /** + * Setter method for ARMRMProxy weights. + * + * @param policyWeights the amrmproxy weights. + */ + public void setAMRMPolicyWeights( + Map policyWeights) { + this.amrmPolicyWeights = policyWeights; + } + + /** + * Getter of the router weights. + * @return the router weights. + */ + public Map getRouterPolicyWeights() { + return routerPolicyWeights; + } + + /** + * Getter for AMRMProxy weights. + * @return the AMRMProxy weights. + */ + public Map getAMRMPolicyWeights() { + return amrmPolicyWeights; + } + + /** + * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON + * representation. + * + * @param bb the input byte representation. + * + * @return the {@link WeightedPolicyInfo} represented. + * + * @throws FederationPolicyInitializationException if a deserializaiton error + * occurs. + */ + public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb) + throws FederationPolicyInitializationException { + + if (jsonjaxbContext == null) { + throw new FederationPolicyInitializationException("JSONJAXBContext should" + + " not be null."); + } + + try { + JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller(); + final byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + String params = new String(bytes, Charset.forName("UTF-8")); + + WeightedPolicyInfo weightedPolicyInfo = unmarshaller + .unmarshalFromJSON(new StringReader(params), + WeightedPolicyInfo.class); + return weightedPolicyInfo; + } catch (JAXBException j) { + throw new FederationPolicyInitializationException(j); + } + } + + /** + * Converts the policy into a byte array representation in the input {@link + * ByteBuffer}. + * + * @return byte array representation of this policy configuration. + * + * @throws FederationPolicyInitializationException if a serialization error + * occurs. + */ + public ByteBuffer toByteBuffer() + throws FederationPolicyInitializationException { + if (jsonjaxbContext == null) { + throw new FederationPolicyInitializationException("JSONJAXBContext should" + + " not be null."); + } + try { + String s = toJSONString(); + return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8"))); + } catch (JAXBException j) { + throw new FederationPolicyInitializationException(j); + } + } + + private String toJSONString() throws JAXBException { + JSONMarshaller marshaller = jsonjaxbContext.createJSONMarshaller(); + marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true); + StringWriter sw = new StringWriter(256); + marshaller.marshallToJSON(this, sw); + return sw.toString(); + } + + @Override + public boolean equals(Object other) { + + if (other == null || !other.getClass().equals(this.getClass())) { + return false; + } + + WeightedPolicyInfo otherPolicy = + (WeightedPolicyInfo) other; + Map otherAMRMWeights = + otherPolicy.getAMRMPolicyWeights(); + Map otherRouterWeights = + otherPolicy.getRouterPolicyWeights(); + + boolean amrmWeightsMatch = otherAMRMWeights != null && + getAMRMPolicyWeights() != null && + CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(), + getAMRMPolicyWeights().entrySet()); + + boolean routerWeightsMatch = otherRouterWeights != null && + getRouterPolicyWeights() != null && + CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(), + getRouterPolicyWeights().entrySet()); + + return amrmWeightsMatch && routerWeightsMatch; + } + + @Override + public int hashCode() { + return 31 * amrmPolicyWeights.hashCode() + routerPolicyWeights.hashCode(); + } + + /** + * Return the parameter headroomAlpha, used by policies that balance + * weight-based and load-based considerations in their decisions. + * + * For policies that use this parameter, values close to 1 indicate that + * most of the decision should be based on currently observed headroom from + * various sub-clusters, values close to zero, indicate that the decision + * should be mostly based on weights and practically ignore current load. + * + * @return the value of headroomAlpha. + */ + public float getHeadroomAlpha() { + return headroomAlpha; + } + + /** + * Set the parameter headroomAlpha, used by policies that balance + * weight-based and load-based considerations in their decisions. + * + * For policies that use this parameter, values close to 1 indicate that + * most of the decision should be based on currently observed headroom from + * various sub-clusters, values close to zero, indicate that the decision + * should be mostly based on weights and practically ignore current load. + * + * @param headroomAlpha the value to use for balancing. + */ + public void setHeadroomAlpha(float headroomAlpha) { + this.headroomAlpha = headroomAlpha; + } + + @Override + public String toString() { + try { + return toJSONString(); + } catch (JAXBException e) { + e.printStackTrace(); + return "Error serializing to string."; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java new file mode 100644 index 0000000000..43f5b83e78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** DAO objects for serializing/deserializing policy configurations. **/ +package org.apache.hadoop.yarn.server.federation.policies.dao; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java new file mode 100644 index 0000000000..e888979e08 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.Map; + +/** + * Abstract class provides common validation of reinitialize(), for all + * policies that are "weight-based". + */ +public abstract class BaseWeightedRouterPolicy + implements FederationRouterPolicy { + + private WeightedPolicyInfo policyInfo = null; + private FederationPolicyInitializationContext policyContext; + + public BaseWeightedRouterPolicy() { + } + + @Override + public void reinitialize(FederationPolicyInitializationContext + federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // perform consistency checks + WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo + .fromByteBuffer( + federationPolicyContext.getSubClusterPolicyConfiguration() + .getParams()); + + // if nothing has changed skip the rest of initialization + if (policyInfo != null && policyInfo.equals(newPolicyInfo)) { + return; + } + + validate(newPolicyInfo); + setPolicyInfo(newPolicyInfo); + this.policyContext = federationPolicyContext; + } + + /** + * Overridable validation step for the policy configuration. + * @param newPolicyInfo the configuration to test. + * @throws FederationPolicyInitializationException if the configuration is + * not valid. + */ + public void validate(WeightedPolicyInfo newPolicyInfo) throws + FederationPolicyInitializationException { + if (newPolicyInfo == null) { + throw new FederationPolicyInitializationException("The policy to " + + "validate should not be null."); + } + Map newWeights = + newPolicyInfo.getRouterPolicyWeights(); + if (newWeights == null || newWeights.size() < 1) { + throw new FederationPolicyInitializationException( + "Weight vector cannot be null/empty."); + } + } + + + /** + * Getter method for the configuration weights. + * + * @return the {@link WeightedPolicyInfo} representing the policy + * configuration. + */ + public WeightedPolicyInfo getPolicyInfo() { + return policyInfo; + } + + /** + * Setter method for the configuration weights. + * + * @param policyInfo the {@link WeightedPolicyInfo} representing the policy + * configuration. + */ + public void setPolicyInfo( + WeightedPolicyInfo policyInfo) { + this.policyInfo = policyInfo; + } + + /** + * Getter method for the {@link FederationPolicyInitializationContext}. + * @return the context for this policy. + */ + public FederationPolicyInitializationContext getPolicyContext() { + return policyContext; + } + + /** + * Setter method for the {@link FederationPolicyInitializationContext}. + * @param policyContext the context to assign to this policy. + */ + public void setPolicyContext( + FederationPolicyInitializationContext policyContext) { + this.policyContext = policyContext; + } + + /** + * This methods gets active subclusters map from the {@code + * FederationStateStoreFacade} and validate it not being null/empty. + * + * @return the map of ids to info for all active subclusters. + * @throws YarnException if we can't get the list. + */ + protected Map getActiveSubclusters() + throws YarnException { + + Map activeSubclusters = getPolicyContext() + .getFederationStateStoreFacade().getSubClusters(true); + + if (activeSubclusters == null || activeSubclusters.size() < 1) { + throw new NoActiveSubclustersException( + "Zero active subclusters, cannot pick where to send job."); + } + return activeSubclusters; + } + + + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java new file mode 100644 index 0000000000..e57709f432 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +import java.util.Map; + +/** + * This implements a simple load-balancing policy. The policy "weights" are + * binary 0/1 values that enable/disable each sub-cluster, and the policy peaks + * the sub-cluster with the least load to forward this application. + */ +public class LoadBasedRouterPolicy + extends BaseWeightedRouterPolicy { + + private static final Log LOG = + LogFactory.getLog(LoadBasedRouterPolicy.class); + + @Override + public void reinitialize(FederationPolicyInitializationContext + federationPolicyContext) + throws FederationPolicyInitializationException { + + // remember old policyInfo + WeightedPolicyInfo tempPolicy = getPolicyInfo(); + + //attempt new initialization + super.reinitialize(federationPolicyContext); + + //check extra constraints + for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) { + if (weight != 0 && weight != 1) { + //reset to old policyInfo if check fails + setPolicyInfo(tempPolicy); + throw new FederationPolicyInitializationException( + this.getClass().getCanonicalName() + + " policy expects all weights to be either " + + "\"0\" or \"1\""); + } + } + } + + @Override + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) + throws YarnException { + + Map activeSubclusters = + getActiveSubclusters(); + + Map weights = getPolicyInfo() + .getRouterPolicyWeights(); + SubClusterIdInfo chosen = null; + long currBestMem = -1; + for (Map.Entry entry : + activeSubclusters + .entrySet()) { + SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); + if (weights.containsKey(id) && weights.get(id) > 0) { + long availableMemory = getAvailableMemory(entry.getValue()); + if (availableMemory > currBestMem) { + currBestMem = availableMemory; + chosen = id; + } + } + } + + return chosen.toId(); + } + + private long getAvailableMemory(SubClusterInfo value) + throws YarnException { + try { + long mem = -1; + JSONObject obj = new JSONObject(value.getCapability()); + mem = obj.getJSONObject("clusterMetrics").getLong("availableMB"); + return mem; + } catch (JSONException j) { + throw new YarnException("FederationSubCluserInfo cannot be parsed", j); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java new file mode 100644 index 0000000000..a8ac5f7864 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.Map; + +/** + * This implements a policy that interprets "weights" as a ordered list of + * preferences among sub-clusters. Highest weight among active subclusters is + * chosen. + */ +public class PriorityRouterPolicy + extends BaseWeightedRouterPolicy { + + private static final Log LOG = + LogFactory.getLog(PriorityRouterPolicy.class); + + @Override + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) + throws YarnException { + + Map activeSubclusters = + getActiveSubclusters(); + + // This finds the sub-cluster with the highest weight among the + // currently active ones. + Map weights = getPolicyInfo() + .getRouterPolicyWeights(); + SubClusterId chosen = null; + Float currentBest = Float.MIN_VALUE; + for (SubClusterId id : activeSubclusters.keySet()) { + SubClusterIdInfo idInfo = new SubClusterIdInfo(id); + if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) { + currentBest = weights.get(idInfo); + chosen = id; + } + } + + return chosen; + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java new file mode 100644 index 0000000000..1774961821 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * This simple policy picks at uniform random among any of the currently active + * subclusters. This policy is easy to use and good for testing. + * + * NOTE: this is "almost" subsumed by the {@code WeightedRandomRouterPolicy}. + * Behavior only diverges when there are active sub-clusters that are not part + * of the "weights", in which case the {@link UniformRandomRouterPolicy} send + * load to them, while {@code WeightedRandomRouterPolicy} does not. + */ +public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy { + + private Random rand; + + public UniformRandomRouterPolicy() { + rand = new Random(System.currentTimeMillis()); + } + + @Override + public void reinitialize( + FederationPolicyInitializationContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + //note: this overrides BaseWeighterRouterPolicy and ignores the weights + + setPolicyContext(federationPolicyContext); + } + + /** + * Simply picks a random active subcluster to start the AM (this does NOT + * depend on the weights in the policy). + * + * @param appSubmissionContext the context for the app being submitted + * (ignored). + * + * @return a randomly chosen subcluster. + * + * @throws YarnException if there are no active subclusters. + */ + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) + throws YarnException { + + Map activeSubclusters = + getActiveSubclusters(); + + List list = + new ArrayList<>(activeSubclusters.keySet()); + return list.get(rand.nextInt(list.size())); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java new file mode 100644 index 0000000000..077767784f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +import java.util.Map; +import java.util.Random; + +/** + * This policy implements a weighted random sample among currently active + * sub-clusters. + */ +public class WeightedRandomRouterPolicy + extends BaseWeightedRouterPolicy { + + private static final Log LOG = + LogFactory.getLog(WeightedRandomRouterPolicy.class); + private Random rand = new Random(System.currentTimeMillis()); + + @Override + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) + throws YarnException { + + Map activeSubclusters = + getActiveSubclusters(); + + // note: we cannot pre-compute the weights, as the set of activeSubcluster + // changes dynamically (and this would unfairly spread the load to + // sub-clusters adjacent to an inactive one), hence we need to count/scan + // the list and based on weight pick the next sub-cluster. + Map weights = getPolicyInfo() + .getRouterPolicyWeights(); + + float totActiveWeight = 0; + for(Map.Entry entry : weights.entrySet()){ + if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey() + .toId())){ + totActiveWeight += entry.getValue(); + } + } + float lookupValue = rand.nextFloat() * totActiveWeight; + + for (SubClusterId id : activeSubclusters.keySet()) { + SubClusterIdInfo idInfo = new SubClusterIdInfo(id); + if (weights.containsKey(idInfo)) { + lookupValue -= weights.get(idInfo); + } + if (lookupValue <= 0) { + return id; + } + } + //should never happen + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java new file mode 100644 index 0000000000..e2260a1f45 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * This class represent a sub-cluster identifier in the JSON representation + * of the policy configuration. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +@XmlRootElement(name = "federation-policy") +@XmlAccessorType(XmlAccessType.FIELD) +public class SubClusterIdInfo { + + private String id; + + public SubClusterIdInfo() { + //JAXB needs this + } + + public SubClusterIdInfo(String subClusterId) { + this.id = subClusterId; + } + + public SubClusterIdInfo(SubClusterId subClusterId) { + this.id = subClusterId.getId(); + } + + /** + * Get the sub-cluster identifier as {@link SubClusterId}. + * @return the sub-cluster id. + */ + public SubClusterId toId() { + return SubClusterId.newInstance(id); + } + + @Override + public boolean equals(Object other) { + if (other instanceof SubClusterIdInfo) { + if (((SubClusterIdInfo) other).id.equals(this.id)) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java new file mode 100644 index 0000000000..8da92b9876 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies; + +import static org.mockito.Mockito.mock; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; +import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Test; + +/** + * Base class for policies tests, tests for common reinitialization cases. + */ +public abstract class BaseFederationPoliciesTest { + + private ConfigurableFederationPolicy policy; + private WeightedPolicyInfo policyInfo; + private Map activeSubclusters = new HashMap<>(); + private FederationPolicyInitializationContext federationPolicyContext; + private ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + private Random rand = new Random(); + + @Test + public void testReinitilialize() throws YarnException { + FederationPolicyInitializationContext fpc = + new FederationPolicyInitializationContext(); + ByteBuffer buf = getPolicyInfo().toByteBuffer(); + fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration + .newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf)); + fpc.setFederationSubclusterResolver( + FederationPoliciesTestUtil.initResolver()); + fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); + getPolicy().reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad1() throws YarnException { + getPolicy().reinitialize(null); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad2() throws YarnException { + FederationPolicyInitializationContext fpc = + new FederationPolicyInitializationContext(); + getPolicy().reinitialize(fpc); + } + + @Test(expected = FederationPolicyInitializationException.class) + public void testReinitilializeBad3() throws YarnException { + FederationPolicyInitializationContext fpc = + new FederationPolicyInitializationContext(); + ByteBuffer buf = mock(ByteBuffer.class); + fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration + .newInstance("queue1", "WrongPolicyName", buf)); + fpc.setFederationSubclusterResolver( + FederationPoliciesTestUtil.initResolver()); + fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); + getPolicy().reinitialize(fpc); + } + + @Test(expected = NoActiveSubclustersException.class) + public void testNoSubclusters() throws YarnException { + // empty the activeSubclusters map + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), new HashMap<>()); + + ConfigurableFederationPolicy currentPolicy = getPolicy(); + if (currentPolicy instanceof FederationRouterPolicy) { + ((FederationRouterPolicy) currentPolicy) + .getHomeSubcluster(getApplicationSubmissionContext()); + } + } + + public ConfigurableFederationPolicy getPolicy() { + return policy; + } + + public void setPolicy(ConfigurableFederationPolicy policy) { + this.policy = policy; + } + + public WeightedPolicyInfo getPolicyInfo() { + return policyInfo; + } + + public void setPolicyInfo(WeightedPolicyInfo policyInfo) { + this.policyInfo = policyInfo; + } + + public Map getActiveSubclusters() { + return activeSubclusters; + } + + public void setActiveSubclusters( + Map activeSubclusters) { + this.activeSubclusters = activeSubclusters; + } + + public FederationPolicyInitializationContext getFederationPolicyContext() { + return federationPolicyContext; + } + + public void setFederationPolicyContext( + FederationPolicyInitializationContext federationPolicyContext) { + this.federationPolicyContext = federationPolicyContext; + } + + public ApplicationSubmissionContext getApplicationSubmissionContext() { + return applicationSubmissionContext; + } + + public void setApplicationSubmissionContext( + ApplicationSubmissionContext applicationSubmissionContext) { + this.applicationSubmissionContext = applicationSubmissionContext; + } + + public Random getRand() { + return rand; + } + + public void setRand(Random rand) { + this.rand = rand; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index 4ec04d5533..e840b3fee0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -77,7 +77,7 @@ public void wrongType() throws Exception { @Test(expected = FederationPolicyInitializationException.class) public void nullConf() throws Exception { - context.setFederationPolicyConfiguration(null); + context.setSubClusterPolicyConfiguration(null); FederationPolicyInitializationContextValidator.validate(context, MockPolicyManager.class.getCanonicalName()); } @@ -96,8 +96,8 @@ public void nullFacade() throws Exception { MockPolicyManager.class.getCanonicalName()); } - private class MockPolicyManager - implements FederationPolicyWriter, FederationPolicyConfigurator { + private class MockPolicyManager implements FederationPolicyManager { + @Override public FederationAMRMProxyPolicy getAMRMPolicy( FederationPolicyInitializationContext @@ -123,6 +123,17 @@ public SubClusterPolicyConfiguration serializeConf() return SubClusterPolicyConfiguration .newInstance("queue1", this.getClass().getCanonicalName(), buf); } + + @Override + public String getQueue() { + return "default"; + } + + @Override + public void setQueue(String queue) { + + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java new file mode 100644 index 0000000000..9e94f72587 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the + * load is properly considered for allocation. + */ +public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new LoadBasedRouterPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 active subclusters + for (int i = 0; i < 20; i++) { + SubClusterIdInfo sc = + new SubClusterIdInfo(String.format("sc%02d", i)); + SubClusterInfo federationSubClusterInfo = + SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1, + SubClusterState.SC_RUNNING, -1, + generateClusterMetricsInfo(i)); + getActiveSubclusters().put(sc.toId(), federationSubClusterInfo); + float weight = getRand().nextInt(2); + if (i == 5) { + weight = 1.0f; + } + + // 5% chance we omit one of the weights + if (i <= 5 || getRand().nextFloat() > 0.05f) { + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + } + } + getPolicyInfo().setRouterPolicyWeights(routerWeights); + getPolicyInfo().setAMRMPolicyWeights(amrmWeights); + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), getActiveSubclusters()); + + } + + private String generateClusterMetricsInfo(int id) { + + long mem = 1024 * getRand().nextInt(277 * 100 - 1); + //plant a best cluster + if (id == 5) { + mem = 1024 * 277 * 100; + } + String clusterMetrics = + "{\"clusterMetrics\":{\"appsSubmitted\":65," + "\"appsCompleted\":64," + + "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0," + + "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + mem + "," + + "\"allocatedMB\":0,\"reservedVirtualCores\":0," + + "\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0," + + "\"containersAllocated\":0,\"containersReserved\":0," + + "\"containersPending\":0,\"totalMB\":28364800," + + "\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1," + + "\"unhealthyNodes\":0,\"decommissionedNodes\":0," + + "\"rebootedNodes\":0,\"activeNodes\":277}}\n"; + + return clusterMetrics; + + } + + @Test + public void testLoadIsRespected() throws YarnException { + + SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(getApplicationSubmissionContext()); + + // check the "planted" best cluster is chosen + Assert.assertEquals("sc05", chosen.getId()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java new file mode 100644 index 0000000000..ff5175d01b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Simple test class for the {@link PriorityRouterPolicy}. Tests that the + * weights are correctly used for ordering the choice of sub-clusters. + */ +public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new PriorityRouterPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 20; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + + // with 5% omit a subcluster + if (getRand().nextFloat() < 0.95f || i == 5) { + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + float weight = getRand().nextFloat(); + if (i == 5) { + weight = 1.1f; // guaranteed to be the largest. + } + + // 5% chance we omit one of the weights + if (i <= 5 || getRand().nextFloat() > 0.05f) { + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + } + } + getPolicyInfo().setRouterPolicyWeights(routerWeights); + getPolicyInfo().setAMRMPolicyWeights(amrmWeights); + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), + getActiveSubclusters()); + + } + + @Test + public void testPickLowestWeight() throws YarnException { + SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(getApplicationSubmissionContext()); + Assert.assertEquals("sc5", chosen.getId()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java new file mode 100644 index 0000000000..ac41ab550d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one + * of the active subcluster is chosen. + */ +public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new UniformRandomRouterPolicy()); + // needed for base test to work + setPolicyInfo(mock(WeightedPolicyInfo.class)); + for (int i = 1; i <= 2; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + mock(WeightedPolicyInfo.class), getActiveSubclusters()); + } + + @Test + public void testOneSubclusterIsChosen() throws YarnException { + SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(getApplicationSubmissionContext()); + Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java new file mode 100644 index 0000000000..a612685b36 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large + * number of randomized tests to check we are weighiting correctly even if + * clusters go inactive. + */ +public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { + + @Before + public void setUp() throws Exception { + setPolicy(new WeightedRandomRouterPolicy()); + setPolicyInfo(new WeightedPolicyInfo()); + Map routerWeights = new HashMap<>(); + Map amrmWeights = new HashMap<>(); + + // simulate 20 subclusters with a 5% chance of being inactive + for (int i = 0; i < 20; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + // with 5% omit a subcluster + if (getRand().nextFloat() < 0.95f) { + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + // 5% chance we omit one of the weights + float weight = getRand().nextFloat(); + if (i <= 5 || getRand().nextFloat() > 0.05f) { + routerWeights.put(sc, weight); + amrmWeights.put(sc, weight); + } + } + getPolicyInfo().setRouterPolicyWeights(routerWeights); + getPolicyInfo().setAMRMPolicyWeights(amrmWeights); + + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), + getActiveSubclusters()); + + } + + @Test + public void testClusterChosenWithRightProbability() throws YarnException { + + Map counter = new HashMap<>(); + for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights() + .keySet()) { + counter.put(id.toId(), new AtomicLong(0)); + } + + float numberOfDraws = 1000000; + + for (float i = 0; i < numberOfDraws; i++) { + SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()). + getHomeSubcluster(getApplicationSubmissionContext()); + counter.get(chosenId).incrementAndGet(); + } + + float totalActiveWeight = 0; + for (SubClusterId id : getActiveSubclusters().keySet()) { + SubClusterIdInfo idInfo = new SubClusterIdInfo(id); + if (getPolicyInfo().getRouterPolicyWeights().containsKey(idInfo)) { + totalActiveWeight += + getPolicyInfo().getRouterPolicyWeights().get(idInfo); + } + } + + for (Map.Entry counterEntry : counter + .entrySet()) { + float expectedWeight = getPolicyInfo().getRouterPolicyWeights() + .get(new SubClusterIdInfo(counterEntry.getKey())) / totalActiveWeight; + float actualWeight = counterEntry.getValue().floatValue() / numberOfDraws; + + // make sure that the weights is respected among active subclusters + // and no jobs are routed to inactive subclusters. + if (getActiveSubclusters().containsKey(counterEntry.getKey())) { + Assert.assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight + + " expected weight: " + expectedWeight, expectedWeight == 0 || + (actualWeight / expectedWeight) < 1.1 + && (actualWeight / expectedWeight) > 0.9); + } else { + Assert.assertTrue( + "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight + + " expected weight: " + expectedWeight, actualWeight == 0); + + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java index 8c2115be53..f901329dea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java @@ -19,13 +19,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; -import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.*; import java.net.URL; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -41,6 +48,41 @@ private FederationPoliciesTestUtil() { // disabled. } + + public static void initializePolicyContext( + FederationPolicyInitializationContext fpc, ConfigurableFederationPolicy + policy, WeightedPolicyInfo policyInfo, + Map activeSubclusters) + throws YarnException { + ByteBuffer buf = policyInfo.toByteBuffer(); + fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration + .newInstance("queue1", policy.getClass().getCanonicalName(), buf)); + FederationStateStoreFacade facade = FederationStateStoreFacade + .getInstance(); + FederationStateStore fss = mock(FederationStateStore.class); + + if (activeSubclusters == null) { + activeSubclusters = new HashMap(); + } + GetSubClustersInfoResponse response = GetSubClustersInfoResponse + .newInstance(new ArrayList(activeSubclusters.values())); + + when(fss.getSubClusters(any())).thenReturn(response); + facade.reinitialize(fss, new Configuration()); + fpc.setFederationStateStoreFacade(facade); + policy.reinitialize(fpc); + } + + public static void initializePolicyContext( + ConfigurableFederationPolicy policy, + WeightedPolicyInfo policyInfo, Map activeSubclusters) throws YarnException { + FederationPolicyInitializationContext context = + new FederationPolicyInitializationContext(null, initResolver(), + initFacade()); + initializePolicyContext(context, policy, policyInfo, activeSubclusters); + } + /** * Initialize a {@link SubClusterResolver}. * @@ -62,6 +104,45 @@ public static SubClusterResolver initResolver() { return resolver; } + /** + * Initialiaze a main-memory {@link FederationStateStoreFacade} used for + * testing, wiht a mock resolver. + * + * @param subClusterInfos the list of subclusters to be served on + * getSubClusters invocations. + * + * @return the facade. + * + * @throws YarnException in case the initialization is not successful. + */ + + public static FederationStateStoreFacade initFacade( + List subClusterInfos, SubClusterPolicyConfiguration + policyConfiguration) throws YarnException { + FederationStateStoreFacade goodFacade = FederationStateStoreFacade + .getInstance(); + FederationStateStore fss = mock(FederationStateStore.class); + GetSubClustersInfoResponse response = GetSubClustersInfoResponse + .newInstance(subClusterInfos); + when(fss.getSubClusters(any())).thenReturn(response); + + List configurations = new ArrayList<>(); + configurations.add(policyConfiguration); + + GetSubClusterPoliciesConfigurationsResponse policiesResponse = + GetSubClusterPoliciesConfigurationsResponse + .newInstance(configurations); + when(fss.getPoliciesConfigurations(any())).thenReturn(policiesResponse); + + GetSubClusterPolicyConfigurationResponse policyResponse = + GetSubClusterPolicyConfigurationResponse + .newInstance(policyConfiguration); + when(fss.getPolicyConfiguration(any())).thenReturn(policyResponse); + + goodFacade.reinitialize(fss, new Configuration()); + return goodFacade; + } + /** * Initialiaze a main-memory {@link FederationStateStoreFacade} used for * testing, wiht a mock resolver. @@ -71,13 +152,8 @@ public static SubClusterResolver initResolver() { * @throws YarnException in case the initialization is not successful. */ public static FederationStateStoreFacade initFacade() throws YarnException { - FederationStateStoreFacade goodFacade = FederationStateStoreFacade - .getInstance(); - FederationStateStore fss = mock(FederationStateStore.class); - GetSubClustersInfoResponse response = GetSubClustersInfoResponse - .newInstance(new ArrayList<>()); - when(fss.getSubClusters(any())).thenReturn(response); - goodFacade.reinitialize(fss, new Configuration()); - return goodFacade; + return initFacade(new ArrayList<>(), mock(SubClusterPolicyConfiguration + .class)); } + }