From 53c3ae1c8930f2ed4fdd1c4e59622cfa37fd3fea Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Sun, 19 Nov 2023 10:04:46 +0800 Subject: [PATCH] YARN-11610. [Federation] Add WeightedHomePolicyManager. (#6256) Contributed by Shilun Fan. Reviewed-by: Inigo Goiri Signed-off-by: Shilun Fan --- .../manager/WeightedHomePolicyManager.java | 65 +++++++++++++++++++ .../TestWeightedHomePolicyManager.java | 63 ++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedHomePolicyManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java new file mode 100644 index 0000000000..370594ec32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedHomePolicyManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; + +import java.nio.ByteBuffer; + +/** + * Policy that allows operator to configure "weights" for routing. This picks a + * {@link WeightedRandomRouterPolicy} for the router and a {@link + * HomeAMRMProxyPolicy} for the amrmproxy as they are designed to + * work together. + */ +public class WeightedHomePolicyManager extends AbstractPolicyManager { + + private WeightedPolicyInfo weightedPolicyInfo; + + public WeightedHomePolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = WeightedRandomRouterPolicy.class; + amrmProxyFederationPolicy = HomeAMRMProxyPolicy.class; + weightedPolicyInfo = new WeightedPolicyInfo(); + } + + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + ByteBuffer buf = weightedPolicyInfo.toByteBuffer(); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } + + @VisibleForTesting + public WeightedPolicyInfo getWeightedPolicyInfo() { + return weightedPolicyInfo; + } + + @VisibleForTesting + public void setWeightedPolicyInfo( + WeightedPolicyInfo weightedPolicyInfo) { + this.weightedPolicyInfo = weightedPolicyInfo; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedHomePolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedHomePolicyManager.java new file mode 100644 index 0000000000..3d0c5895f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedHomePolicyManager.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.HomeAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class TestWeightedHomePolicyManager extends BasePolicyManagerTest { + private WeightedPolicyInfo policyInfo; + + @Before + public void setup() { + // configure a policy + WeightedHomePolicyManager whpm = new WeightedHomePolicyManager(); + whpm.setQueue("queue1"); + + SubClusterId sc1 = SubClusterId.newInstance("sc1"); + policyInfo = new WeightedPolicyInfo(); + Map routerWeights = new HashMap<>(); + routerWeights.put(new SubClusterIdInfo(sc1), 0.2f); + policyInfo.setRouterPolicyWeights(routerWeights); + + whpm.setWeightedPolicyInfo(policyInfo); + this.wfp = whpm; + + //set expected params that the base test class will use for tests + expectedPolicyManager = WeightedHomePolicyManager.class; + expectedAMRMProxyPolicy = HomeAMRMProxyPolicy.class; + expectedRouterPolicy = WeightedRandomRouterPolicy.class; + } + + @Test + public void testPolicyInfoSetCorrectly() throws Exception { + serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, + expectedAMRMProxyPolicy, expectedRouterPolicy); + // check the policyInfo propagates through ser/der correctly + Assert.assertEquals(((WeightedHomePolicyManager) wfp) + .getWeightedPolicyInfo(), policyInfo); + } +}