From 4cfec943b177e2123a935e70d39776521883c2bc Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Wed, 21 Jun 2017 19:08:47 -0700 Subject: [PATCH] YARN-6724. Add ability to blacklist sub-clusters when invoking Routing policies. (Giovanni Matteo Fumarola via Subru). (cherry picked from commit f8e5de59697cb78686f0e605dc7e93628b5f3297) --- .../policies/RouterPolicyFacade.java | 15 +++++-- .../router/FederationRouterPolicy.java | 18 +++++--- .../router/HashBasedRouterPolicy.java | 22 ++++++++-- .../router/LoadBasedRouterPolicy.java | 7 ++- .../policies/router/PriorityRouterPolicy.java | 7 ++- .../policies/router/RejectRouterPolicy.java | 26 +++++++---- .../router/UniformRandomRouterPolicy.java | 23 ++++++++-- .../router/WeightedRandomRouterPolicy.java | 11 ++++- .../policies/BaseFederationPoliciesTest.java | 2 +- .../policies/TestRouterPolicyFacade.java | 12 +++--- .../router/BaseRouterPoliciesTest.java | 43 ++++++++++++++++++- .../router/TestHashBasedRouterPolicy.java | 2 +- .../router/TestLoadBasedRouterPolicy.java | 2 +- .../router/TestPriorityRouterPolicy.java | 2 +- .../router/TestRejectRouterPolicy.java | 4 +- .../router/TestUniformRandomRouterPolicy.java | 2 +- .../TestWeightedRandomRouterPolicy.java | 2 +- 17 files changed, 157 insertions(+), 43 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 5e31a088f1..44c1b10a72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -110,16 +111,22 @@ public RouterPolicyFacade(YarnConfiguration conf, * This method provides a wrapper of all policy functionalities for routing . * Internally it manages configuration changes, and policy init/reinit. * - * @param appSubmissionContext the application to route. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. * - * @return the id of the subcluster that will be the "home" for this + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return the {@link SubClusterId} that will be the "home" for this * application. * * @throws YarnException if there are issues initializing policies, or no * valid sub-cluster id could be found for this app. */ public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // the maps are concurrent, but we need to protect from reset() // reinitialization mid-execution by creating a new reference local to this @@ -186,7 +193,7 @@ public SubClusterId getHomeSubcluster( + "and no default specified."); } - return policy.getHomeSubcluster(appSubmissionContext); + return policy.getHomeSubcluster(appSubmissionContext, blackListSubClusters); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index 90ea0a87ce..9325bd8ca2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy; @@ -29,16 +31,22 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy { /** - * Determines the sub-cluster that the user application submision should be + * Determines the sub-cluster that the user application submission should be * routed to. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. * - * @return the sub-cluster as identified by {@link SubClusterId} to route the - * request to. + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return the {@link SubClusterId} that will be the "home" for this + * application. * * @throws YarnException if the policy cannot determine a viable subcluster. */ SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException; + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java index e40e87ea3a..257a9fef67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -55,19 +55,35 @@ public void reinitialize( * sub-cluster, as far as the number of active sub-cluster and their names * remain the same. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. * - * @return a hash-based chosen subcluster. + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * + * @return a hash-based chosen {@link SubClusterId} that will be the "home" + * for this application. * * @throws YarnException if there are no active subclusters. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // throws if no active subclusters available Map activeSubclusters = getActiveSubclusters(); + if (blackListSubClusters != null) { + + // Remove from the active SubClusters from StateStore the blacklisted ones + for (SubClusterId scId : blackListSubClusters) { + activeSubclusters.remove(scId); + } + } + validate(appSubmissionContext); int chosenPosition = Math.abs( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index 2ca15bf045..c12400194a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -62,7 +63,8 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -76,6 +78,9 @@ public SubClusterId getHomeSubcluster( long currBestMem = -1; for (Map.Entry entry : activeSubclusters .entrySet()) { + if (blacklist != null && blacklist.contains(entry.getKey())) { + continue; + } SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey()); if (weights.containsKey(id) && weights.get(id) > 0) { long availableMemory = getAvailableMemory(entry.getValue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index 13d9140243..59f8767970 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -50,6 +52,9 @@ public SubClusterId getHomeSubcluster( Float currentBest = Float.MIN_VALUE; for (SubClusterId id : activeSubclusters.keySet()) { SubClusterIdInfo idInfo = new SubClusterIdInfo(id); + if (blacklist != null && blacklist.contains(id)) { + continue; + } if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) { currentBest = weights.get(idInfo); chosen = id; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java index faf3279ecd..b4c0192702 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/RejectRouterPolicy.java @@ -17,6 +17,8 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -27,8 +29,8 @@ /** * This {@link FederationRouterPolicy} simply rejects all incoming requests. - * This is useful to prevent applications running in a queue to be run - * anywhere in the federated cluster. + * This is useful to prevent applications running in a queue to be run anywhere + * in the federated cluster. */ public class RejectRouterPolicy extends AbstractRouterPolicy { @@ -44,23 +46,31 @@ public void reinitialize( /** * The policy always reject requests. * - * @param appSubmissionContext the context for the app being submitted. + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. * * @return (never). * - * @throws YarnException (always) to prevent applications in this queue to - * be run anywhere in the federated cluster. + * @throws YarnException (always) to prevent applications in this queue to be + * run anywhere in the federated cluster. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // run standard validation, as error might differ validate(appSubmissionContext); throw new FederationPolicyException("The policy configured for this queue" + " (" + appSubmissionContext.getQueue() + ") reject all routing " - + "requests by construction. Application " + appSubmissionContext - .getApplicationId() + " cannot be routed to any RM."); + + "requests by construction. Application " + + appSubmissionContext.getApplicationId() + + " cannot be routed to any RM."); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index d8204498ea..bc729b7409 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -59,18 +59,24 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) } /** - * Simply picks a random active subcluster to start the AM (this does NOT + * Simply picks a random active subCluster to start the AM (this does NOT * depend on the weights in the policy). * - * @param appSubmissionContext the context for the app being submitted - * (ignored). + * @param appSubmissionContext the {@link ApplicationSubmissionContext} that + * has to be routed to an appropriate subCluster for execution. + * + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. * * @return a randomly chosen subcluster. * * @throws YarnException if there are no active subclusters. */ + @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blackListSubClusters) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -79,6 +85,15 @@ public SubClusterId getHomeSubcluster( getActiveSubclusters(); List list = new ArrayList<>(activeSubclusters.keySet()); + + if (blackListSubClusters != null) { + + // Remove from the active SubClusters from StateStore the blacklisted ones + for (SubClusterId scId : blackListSubClusters) { + list.remove(scId); + } + } + return list.get(rand.nextInt(list.size())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index 5727134a83..7f230a711e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.List; import java.util.Map; import java.util.Random; @@ -41,7 +42,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { @Override public SubClusterId getHomeSubcluster( - ApplicationSubmissionContext appSubmissionContext) throws YarnException { + ApplicationSubmissionContext appSubmissionContext, + List blacklist) throws YarnException { // null checks and default-queue behavior validate(appSubmissionContext); @@ -58,6 +60,9 @@ public SubClusterId getHomeSubcluster( float totActiveWeight = 0; for (Map.Entry entry : weights.entrySet()) { + if (blacklist != null && blacklist.contains(entry.getKey().toId())) { + continue; + } if (entry.getKey() != null && activeSubclusters.containsKey(entry.getKey().toId())) { totActiveWeight += entry.getValue(); @@ -66,6 +71,9 @@ public SubClusterId getHomeSubcluster( float lookupValue = rand.nextFloat() * totActiveWeight; for (SubClusterId id : activeSubclusters.keySet()) { + if (blacklist != null && blacklist.contains(id)) { + continue; + } SubClusterIdInfo idInfo = new SubClusterIdInfo(id); if (weights.containsKey(idInfo)) { lookupValue -= weights.get(idInfo); @@ -77,4 +85,5 @@ public SubClusterId getHomeSubcluster( // should never happen return null; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 6bd8bf0d96..23978ed886 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -104,7 +104,7 @@ public void testNoSubclusters() throws YarnException { ConfigurableFederationPolicy localPolicy = getPolicy(); if (localPolicy instanceof FederationRouterPolicy) { ((FederationRouterPolicy) localPolicy) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); } else { String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java index 5fa02d60f4..d0e2decb2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java @@ -95,7 +95,7 @@ public void testConfigurationUpdate() throws YarnException { // first call runs using standard UniformRandomRouterPolicy SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertTrue(routerFacade.globalPolicyMap .get(queue1) instanceof UniformRandomRouterPolicy); @@ -107,7 +107,7 @@ public void testConfigurationUpdate() throws YarnException { .newInstance(getPriorityPolicy(queue1))); // second call is routed by new policy PriorityRouterPolicy - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(chosen.equals(subClusterIds.get(0))); Assert.assertTrue(routerFacade.globalPolicyMap .get(queue1) instanceof PriorityRouterPolicy); @@ -126,7 +126,7 @@ public void testGetHomeSubcluster() throws YarnException { // when invoked it returns the expected SubClusterId. SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); // now the caching of policies must have added an entry for this queue @@ -160,19 +160,19 @@ public void testFallbacks() throws YarnException { String uninitQueue = "non-initialized-queue"; when(applicationSubmissionContext.getQueue()).thenReturn(uninitQueue); SubClusterId chosen = - routerFacade.getHomeSubcluster(applicationSubmissionContext); + routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); // empty string when(applicationSubmissionContext.getQueue()).thenReturn(""); - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); // null queue also falls back to default when(applicationSubmissionContext.getQueue()).thenReturn(null); - chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext); + chosen = routerFacade.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertTrue(subClusterIds.contains(chosen)); Assert.assertFalse(routerFacade.globalPolicyMap.containsKey(uninitQueue)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java index 2e7a0afb8f..c7a7767794 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -18,11 +18,19 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; + import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -40,12 +48,43 @@ public void testNullQueueRouting() throws YarnException { ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, Resources.none(), null, false, null, null); SubClusterId chosen = - localPolicy.getHomeSubcluster(applicationSubmissionContext); + localPolicy.getHomeSubcluster(applicationSubmissionContext, null); Assert.assertNotNull(chosen); } @Test(expected = FederationPolicyException.class) public void testNullAppContext() throws YarnException { - ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null); + ((FederationRouterPolicy) getPolicy()).getHomeSubcluster(null, null); + } + + @Test + public void testBlacklistSubcluster() throws YarnException { + FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); + ApplicationSubmissionContext applicationSubmissionContext = + ApplicationSubmissionContext.newInstance(null, null, null, null, null, + false, false, 0, Resources.none(), null, false, null, null); + Map activeSubClusters = + getActiveSubclusters(); + if (activeSubClusters != null && activeSubClusters.size() > 1 + && !(localPolicy instanceof RejectRouterPolicy)) { + // blacklist all the active subcluster but one. + Random random = new Random(); + List blacklistSubclusters = + new ArrayList(activeSubClusters.keySet()); + SubClusterId removed = blacklistSubclusters + .remove(random.nextInt(blacklistSubclusters.size())); + // bias LoadBasedRouterPolicy + getPolicyInfo().getRouterPolicyWeights() + .put(new SubClusterIdInfo(removed), 1.0f); + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), getActiveSubclusters()); + + SubClusterId chosen = localPolicy.getHomeSubcluster( + applicationSubmissionContext, blacklistSubclusters); + + // check that the selected sub-cluster is only one not blacklisted + Assert.assertNotNull(chosen); + Assert.assertEquals(removed, chosen); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java index af7fe43526..ee3e09d2b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java @@ -70,7 +70,7 @@ public void testHashSpreadUniformlyAmongSubclusters() throws YarnException { for (int i = 0; i < jobPerSub * numSubclusters; i++) { when(applicationSubmissionContext.getQueue()).thenReturn("queue" + i); chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(applicationSubmissionContext); + .getHomeSubcluster(applicationSubmissionContext, null); counter.get(chosen).addAndGet(1); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index b70b4aae04..dc8f99bfce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -97,7 +97,7 @@ private String generateClusterMetricsInfo(int id) { public void testLoadIsRespected() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); // check the "planted" best cluster is chosen Assert.assertEquals("sc05", chosen.getId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index 42d919d461..3c036c1812 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -78,7 +78,7 @@ public void setUp() throws Exception { @Test public void testPickLowestWeight() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); Assert.assertEquals("sc5", chosen.getId()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java index 049ebbfc46..1747f73715 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestRejectRouterPolicy.java @@ -47,7 +47,7 @@ public void setUp() throws Exception { @Test(expected = FederationPolicyException.class) public void testNoClusterIsChosen() throws YarnException { ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); } @Override @@ -57,7 +57,7 @@ public void testNullQueueRouting() throws YarnException { ApplicationSubmissionContext applicationSubmissionContext = ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, Resources.none(), null, false, null, null); - localPolicy.getHomeSubcluster(applicationSubmissionContext); + localPolicy.getHomeSubcluster(applicationSubmissionContext, null); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java index b45aa2a0f8..05490aba67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -57,7 +57,7 @@ public void setUp() throws Exception { @Test public void testOneSubclusterIsChosen() throws YarnException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index 09173e6ee4..c969a30e65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -98,7 +98,7 @@ public void testClusterChosenWithRightProbability() throws YarnException { for (float i = 0; i < numberOfDraws; i++) { SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()) - .getHomeSubcluster(getApplicationSubmissionContext()); + .getHomeSubcluster(getApplicationSubmissionContext(), null); counter.get(chosenId).incrementAndGet(); }