From 4128c9522dcdc16bb3527f74a48ed1242458a165 Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Tue, 22 Nov 2016 15:02:22 -0800 Subject: [PATCH] YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru). (cherry picked from commit 575137f41c27eb72d05d923337f3030a35403e8f) --- .../hadoop/yarn/conf/YarnConfiguration.java | 3 +- .../policies/RouterPolicyFacade.java | 1 + .../{ => manager}/AbstractPolicyManager.java | 17 +++- .../FederationPolicyManager.java | 3 +- .../manager/HashBroadcastPolicyManager.java | 38 +++++++++ .../PriorityBroadcastPolicyManager.java | 2 +- .../UniformBroadcastPolicyManager.java | 18 +--- .../WeightedLocalityPolicyManager.java | 2 +- .../policies/manager/package-info.java | 19 +++++ .../policies/router/AbstractRouterPolicy.java | 19 +++++ .../router/HashBasedRouterPolicy.java | 81 ++++++++++++++++++ .../router/LoadBasedRouterPolicy.java | 3 + .../policies/router/PriorityRouterPolicy.java | 3 + .../router/UniformRandomRouterPolicy.java | 10 ++- .../router/WeightedRandomRouterPolicy.java | 3 + .../policies/BaseFederationPoliciesTest.java | 17 +++- ...nPolicyInitializationContextValidator.java | 1 + .../policies/TestRouterPolicyFacade.java | 2 + .../{ => manager}/BasePolicyManagerTest.java | 20 ++--- .../TestHashBasedBroadcastPolicyManager.java | 40 +++++++++ .../TestPriorityBroadcastPolicyManager.java | 2 +- .../TestUniformBroadcastPolicyManager.java | 2 +- .../TestWeightedLocalityPolicyManager.java | 2 +- .../router/BaseRouterPoliciesTest.java | 51 ++++++++++++ .../router/TestHashBasedRouterPolicy.java | 83 +++++++++++++++++++ .../router/TestLoadBasedRouterPolicy.java | 3 +- .../router/TestPriorityRouterPolicy.java | 3 +- .../router/TestUniformRandomRouterPolicy.java | 3 +- .../TestWeightedRandomRouterPolicy.java | 15 ++-- 29 files changed, 414 insertions(+), 52 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/AbstractPolicyManager.java (89%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/FederationPolicyManager.java (96%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/PriorityBroadcastPolicyManager.java (97%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/UniformBroadcastPolicyManager.java (69%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/WeightedLocalityPolicyManager.java (97%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/BasePolicyManagerTest.java (88%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/TestPriorityBroadcastPolicyManager.java (97%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/TestUniformBroadcastPolicyManager.java (95%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/{ => manager}/TestWeightedLocalityPolicyManager.java (97%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 33bde5491a..18c34441c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2600,7 +2600,8 @@ public static boolean isAclEnabled(Configuration conf) { + "policy-manager"; public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache" - + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager"; + + ".hadoop.yarn.server.federation.policies" + + ".manager.UniformBroadcastPolicyManager"; public static final String FEDERATION_POLICY_MANAGER_PARAMS = FEDERATION_PREFIX + "policy-manager-params"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index a3fd15a3b5..8c2262317b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java similarity index 89% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java index e77f2e33ae..f7a89c614f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java @@ -15,8 +15,11 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; @@ -24,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; + /** * This class provides basic implementation for common methods that multiple * policies will need to implement. @@ -112,6 +117,16 @@ public FederationRouterPolicy getRouterPolicy( } } + @Override + public SubClusterPolicyConfiguration serializeConf() + throws FederationPolicyInitializationException { + // default implementation works only for sub-classes which do not require + // any parameters + ByteBuffer buf = ByteBuffer.allocate(0); + return SubClusterPolicyConfiguration + .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); + } + @Override public String getQueue() { return queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java similarity index 96% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java index 39fdba33b9..1434c80f42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java @@ -15,8 +15,9 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java new file mode 100644 index 0000000000..08ab08fedf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy; + +/** + * Policy that routes applications via hashing of their queuename, and broadcast + * resource requests. This picks a {@link HashBasedRouterPolicy} for the router + * and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed + * to work together. + */ +public class HashBroadcastPolicyManager extends AbstractPolicyManager { + + public HashBroadcastPolicyManager() { + // this structurally hard-codes two compatible policies for Router and + // AMRMProxy. + routerFederationPolicy = HashBasedRouterPolicy.class; + amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java index ebdcf420d4..8139e1202d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import java.nio.ByteBuffer; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java similarity index 69% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java index a01f8fa8b5..5db0466bd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java @@ -15,14 +15,10 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; -import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; - -import java.nio.ByteBuffer; /** * This class represents a simple implementation of a {@code @@ -36,21 +32,13 @@ * containers than a job requested as all requests are (replicated and) * broadcasted. */ -public class UniformBroadcastPolicyManager - extends AbstractPolicyManager { +public class UniformBroadcastPolicyManager extends AbstractPolicyManager { public UniformBroadcastPolicyManager() { - //this structurally hard-codes two compatible policies for Router and + // this structurally hard-codes two compatible policies for Router and // AMRMProxy. routerFederationPolicy = UniformRandomRouterPolicy.class; amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class; } - @Override - public SubClusterPolicyConfiguration serializeConf() - throws FederationPolicyInitializationException { - ByteBuffer buf = ByteBuffer.allocate(0); - return SubClusterPolicyConfiguration - .newInstance(getQueue(), this.getClass().getCanonicalName(), buf); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java index f3c6673287..109b53437c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java new file mode 100644 index 0000000000..9515c01181 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Various implementation of FederationPolicyManager. **/ +package org.apache.hadoop.yarn.server.federation.policies.manager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java index f49af1d56e..730fb417f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -20,8 +20,11 @@ import java.util.Map; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -44,4 +47,20 @@ public void validate(WeightedPolicyInfo newPolicyInfo) } } + public void validate(ApplicationSubmissionContext appSubmissionContext) + throws FederationPolicyException { + + if (appSubmissionContext == null) { + throw new FederationPolicyException( + "Cannot route an application with null context."); + } + + // if the queue is not specified we set it to default value, to be + // compatible with YARN behavior. + String queue = appSubmissionContext.getQueue(); + if (queue == null) { + appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java new file mode 100644 index 0000000000..e40e87ea3a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; + +/** + * This {@link FederationRouterPolicy} pick a subcluster based on the hash of + * the job's queue name. Useful to provide a default behavior when too many + * queues exist in a system. This also ensures that all jobs belonging to a + * queue are mapped to the same sub-cluster (likely help with locality). + */ +public class HashBasedRouterPolicy extends AbstractRouterPolicy { + + @Override + public void reinitialize( + FederationPolicyInitializationContext federationPolicyContext) + throws FederationPolicyInitializationException { + FederationPolicyInitializationContextValidator + .validate(federationPolicyContext, this.getClass().getCanonicalName()); + + // note: this overrides BaseRouterPolicy and ignores the weights + setPolicyContext(federationPolicyContext); + } + + /** + * Simply picks from alphabetically-sorted active subclusters based on the + * hash of quey name. Jobs of the same queue will all be routed to the same + * sub-cluster, as far as the number of active sub-cluster and their names + * remain the same. + * + * @param appSubmissionContext the context for the app being submitted. + * + * @return a hash-based chosen subcluster. + * + * @throws YarnException if there are no active subclusters. + */ + public SubClusterId getHomeSubcluster( + ApplicationSubmissionContext appSubmissionContext) throws YarnException { + + // throws if no active subclusters available + Map activeSubclusters = + getActiveSubclusters(); + + validate(appSubmissionContext); + + int chosenPosition = Math.abs( + appSubmissionContext.getQueue().hashCode() % activeSubclusters.size()); + + List list = new ArrayList<>(activeSubclusters.keySet()); + Collections.sort(list); + return list.get(chosenPosition); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index 5de749fdec..2ca15bf045 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -64,6 +64,9 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index bc3a1f790f..13d9140243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index b8f9cc329d..d8204498ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -48,11 +48,10 @@ public UniformRandomRouterPolicy() { } @Override - public void reinitialize( - FederationPolicyInitializationContext policyContext) + public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException { - FederationPolicyInitializationContextValidator - .validate(policyContext, this.getClass().getCanonicalName()); + FederationPolicyInitializationContextValidator.validate(policyContext, + this.getClass().getCanonicalName()); // note: this overrides AbstractRouterPolicy and ignores the weights @@ -73,6 +72,9 @@ public void reinitialize( public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index ac75ae9cf8..5727134a83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext) throws YarnException { + // null checks and default-queue behavior + validate(appSubmissionContext); + Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index ba897da71f..6bd8bf0d96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.federation.policies; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.HashMap; @@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.junit.Test; @@ -46,7 +49,7 @@ public abstract class BaseFederationPoliciesTest { private ConfigurableFederationPolicy policy; - private WeightedPolicyInfo policyInfo; + private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class); private Map activeSubclusters = new HashMap<>(); private FederationPolicyInitializationContext federationPolicyContext; private ApplicationSubmissionContext applicationSubmissionContext = @@ -103,7 +106,7 @@ public void testNoSubclusters() throws YarnException { ((FederationRouterPolicy) localPolicy) .getHomeSubcluster(getApplicationSubmissionContext()); } else { - String[] hosts = new String[] {"host1", "host2" }; + String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); ((FederationAMRMProxyPolicy) localPolicy) @@ -170,4 +173,14 @@ public void setHomeSubCluster(SubClusterId homeSubCluster) { this.homeSubCluster = homeSubCluster; } + public void setMockActiveSubclusters(int numSubclusters) { + for (int i = 1; i <= numSubclusters; i++) { + SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i); + SubClusterInfo sci = mock(SubClusterInfo.class); + when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING); + when(sci.getSubClusterId()).thenReturn(sc.toId()); + getActiveSubclusters().put(sc.toId(), sci); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java index d906b92c02..611a48611e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java index 4975a9fb81..5fa02d60f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.policies.manager.PriorityBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java similarity index 88% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java index c6098865fc..3cf73b6775 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/BasePolicyManagerTest.java @@ -15,8 +15,9 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy; @@ -32,7 +33,6 @@ */ public abstract class BasePolicyManagerTest { - @SuppressWarnings("checkstyle:visibilitymodifier") protected FederationPolicyManager wfp = null; @SuppressWarnings("checkstyle:visibilitymodifier") @@ -42,12 +42,10 @@ public abstract class BasePolicyManagerTest { @SuppressWarnings("checkstyle:visibilitymodifier") protected Class expectedRouterPolicy; - @Test public void testSerializeAndInstantiate() throws Exception { serializeAndDeserializePolicyManager(wfp, expectedPolicyManager, - expectedAMRMProxyPolicy, - expectedRouterPolicy); + expectedAMRMProxyPolicy, expectedRouterPolicy); } @Test(expected = FederationPolicyInitializationException.class) @@ -73,11 +71,10 @@ protected static void serializeAndDeserializePolicyManager( Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception { // serializeConf it in a context - SubClusterPolicyConfiguration fpc = - wfp.serializeConf(); + SubClusterPolicyConfiguration fpc = wfp.serializeConf(); fpc.setType(policyManagerType.getCanonicalName()); - FederationPolicyInitializationContext context = new - FederationPolicyInitializationContext(); + FederationPolicyInitializationContext context = + new FederationPolicyInitializationContext(); context.setSubClusterPolicyConfiguration(fpc); context .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade()); @@ -92,7 +89,7 @@ protected static void serializeAndDeserializePolicyManager( FederationAMRMProxyPolicy federationAMRMProxyPolicy = wfp2.getAMRMPolicy(context, null); - //needed only for tests (getARMRMPolicy change the "type" in conf) + // needed only for tests (getARMRMPolicy change the "type" in conf) fpc.setType(wfp.getClass().getCanonicalName()); FederationRouterPolicy federationRouterPolicy = @@ -101,8 +98,7 @@ protected static void serializeAndDeserializePolicyManager( Assert.assertEquals(federationAMRMProxyPolicy.getClass(), expAMRMProxyPolicy); - Assert.assertEquals(federationRouterPolicy.getClass(), - expRouterPolicy); + Assert.assertEquals(federationRouterPolicy.getClass(), expRouterPolicy); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java new file mode 100644 index 0000000000..5fc4a562f8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestHashBasedBroadcastPolicyManager.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.manager; + +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy; +import org.junit.Before; + +/** + * Simple test of {@link HashBroadcastPolicyManager}. + */ +public class TestHashBasedBroadcastPolicyManager extends BasePolicyManagerTest { + + @Before + public void setup() { + // config policy + wfp = new HashBroadcastPolicyManager(); + wfp.setQueue("queue1"); + + // set expected params that the base test class will use for tests + expectedPolicyManager = HashBroadcastPolicyManager.class; + expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class; + expectedRouterPolicy = HashBasedRouterPolicy.class; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestPriorityBroadcastPolicyManager.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestPriorityBroadcastPolicyManager.java index 5e5bc83d00..21b39e909e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestPriorityBroadcastPolicyManager.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import java.util.HashMap; import java.util.Map; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestUniformBroadcastPolicyManager.java similarity index 95% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestUniformBroadcastPolicyManager.java index 542a5ae621..57fafdc79b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestUniformBroadcastPolicyManager.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java index ab9cec411d..5166147300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/manager/TestWeightedLocalityPolicyManager.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.yarn.server.federation.policies; +package org.apache.hadoop.yarn.server.federation.policies.manager; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java new file mode 100644 index 0000000000..2e7a0afb8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +/** + * Base class for router policies tests, tests for null input cases. + */ +public abstract class BaseRouterPoliciesTest + extends BaseFederationPoliciesTest { + + @Test + public void testNullQueueRouting() throws YarnException { + FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); + ApplicationSubmissionContext applicationSubmissionContext = + ApplicationSubmissionContext.newInstance(null, null, null, null, null, + false, false, 0, Resources.none(), null, false, null, null); + SubClusterId chosen = + localPolicy.getHomeSubcluster(applicationSubmissionContext); + Assert.assertNotNull(chosen); + } + + @Test(expected = FederationPolicyException.class) + public void testNullAppContext() throws YarnException { + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java new file mode 100644 index 0000000000..af7fe43526 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.policies.router; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Simple test class for the {@link HashBasedRouterPolicy}. Tests that one of + * the active sub-cluster is chosen. + */ +public class TestHashBasedRouterPolicy extends BaseRouterPoliciesTest { + + private int numSubclusters = 10; + + @Before + public void setUp() throws Exception { + + // set policy in base class + setPolicy(new HashBasedRouterPolicy()); + + // setting up the active sub-clusters for this test + setMockActiveSubclusters(numSubclusters); + + // initialize policy with context + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), getActiveSubclusters()); + } + + @Test + public void testHashSpreadUniformlyAmongSubclusters() throws YarnException { + SubClusterId chosen; + + Map counter = new HashMap<>(); + for (SubClusterId id : getActiveSubclusters().keySet()) { + counter.put(id, new AtomicLong(0)); + } + + long jobPerSub = 100; + + ApplicationSubmissionContext applicationSubmissionContext = + mock(ApplicationSubmissionContext.class); + for (int i = 0; i < jobPerSub * numSubclusters; i++) { + when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i); + chosen = ((FederationRouterPolicy) getPolicy()) + .getHomeSubcluster(applicationSubmissionContext); + counter.get(chosen).addAndGet(1); + } + + // hash spread the jobs equally among the subclusters + for (AtomicLong a : counter.values()) { + Assert.assertEquals(a.get(), jobPerSub); + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index 906e35fe78..b70b4aae04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -36,7 +35,7 @@ * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load * is properly considered for allocation. */ -public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest { +public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest { @Before public void setUp() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index eefcfd9c55..42d919d461 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -38,7 +37,7 @@ * Simple test class for the {@link PriorityRouterPolicy}. Tests that the * weights are correctly used for ordering the choice of sub-clusters. */ -public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest { +public class TestPriorityRouterPolicy extends BaseRouterPoliciesTest { @Before public void setUp() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java index ac41ab550d..b45aa2a0f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -21,7 +21,6 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -36,7 +35,7 @@ * Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one * of the active subcluster is chosen. */ -public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest { +public class TestUniformRandomRouterPolicy extends BaseRouterPoliciesTest { @Before public void setUp() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index 78967d0209..34cc5f8aac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -41,7 +41,7 @@ * number of randomized tests to check we are weighiting correctly even if * clusters go inactive. */ -public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest { +public class TestWeightedRandomRouterPolicy extends BaseRouterPoliciesTest { @Before public void setUp() throws Exception { @@ -78,13 +78,18 @@ public void setUp() throws Exception { @Test public void testClusterChosenWithRightProbability() throws YarnException { + ApplicationSubmissionContext context = + mock(ApplicationSubmissionContext.class); + when(context.getQueue()).thenReturn("queue1"); + setApplicationSubmissionContext(context); + Map counter = new HashMap<>(); for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights() .keySet()) { counter.put(id.toId(), new AtomicLong(0)); } - float numberOfDraws = 1000000; + float numberOfDraws = 100000; for (float i = 0; i < numberOfDraws; i++) { SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) @@ -113,8 +118,8 @@ public void testClusterChosenWithRightProbability() throws YarnException { Assert.assertTrue( "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight + " expected weight: " + expectedWeight, - expectedWeight == 0 || (actualWeight / expectedWeight) < 1.1 - && (actualWeight / expectedWeight) > 0.9); + expectedWeight == 0 || (actualWeight / expectedWeight) < 1.2 + && (actualWeight / expectedWeight) > 0.8); } else { Assert .assertTrue(