From 384764cdeac6490bc47fa0eb7b936baa4c0d3230 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Thu, 28 Jun 2018 12:39:49 -0400 Subject: [PATCH 1/2] YARN-8409. Fixed NPE in ActiveStandbyElectorBasedElectorService. Contributed by Chandni Singh --- .../hadoop/ha/ActiveStandbyElector.java | 5 ++++- .../hadoop/ha/ZKFailoverController.java | 2 +- .../TestRMEmbeddedElector.java | 22 +++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index a23fb71e41..d099ca71ac 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -329,9 +329,12 @@ public synchronized boolean parentZNodeExists() * This recursively creates the znode as well as all of its parents. */ public synchronized void ensureParentZNode() - throws IOException, InterruptedException { + throws IOException, InterruptedException, KeeperException { Preconditions.checkState(!wantToBeInElection, "ensureParentZNode() may not be called while in the election"); + if (zkClient == null) { + createConnection(); + } String pathParts[] = znodeWorkingDir.split("/"); Preconditions.checkArgument(pathParts.length >= 1 && diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 9295288fb7..f66e3c9749 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -269,7 +269,7 @@ private void printUsage() { } private int formatZK(boolean force, boolean interactive) - throws IOException, InterruptedException { + throws IOException, InterruptedException, KeeperException { if (elector.parentZNodeExists()) { if (!force && (!interactive || !confirmFormat())) { return ERR_CODE_FORMAT_DENIED; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 9d38149937..8c038618c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -22,8 +22,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -304,6 +306,26 @@ private void testCallbackSynchronizationTimingStandby(AdminService as, verify(as, times(1)).transitionToStandby(any()); } + /** + * Test that active elector service triggers a fatal RM Event when connection + * to ZK fails. YARN-8409 + */ + @Test + public void testFailureToConnectToZookeeper() throws Exception { + stopServer(); + Configuration myConf = new Configuration(conf); + ResourceManager rm = new MockRM(conf); + + ActiveStandbyElectorBasedElectorService ees = + new ActiveStandbyElectorBasedElectorService(rm); + try { + ees.init(myConf); + Assert.fail("expect failure to connect to Zookeeper"); + } catch (ServiceStateException sse) { + Assert.assertTrue(sse.getMessage().contains("ConnectionLoss")); + } + } + private class MockRMWithElector extends MockRM { private long delayMs = 0; From 291194302cc1a875d6d94ea93cf1184a3f1fc2cc Mon Sep 17 00:00:00 2001 From: Sunil G Date: Thu, 28 Jun 2018 10:23:31 -0700 Subject: [PATCH 2/2] YARN-8379. Improve balancing resources in already satisfied queues by using Capacity Scheduler preemption. Contributed by Zian Chen. --- ...AbstractPreemptableResourceCalculator.java | 21 +- .../CapacitySchedulerPreemptionContext.java | 2 + .../CapacitySchedulerPreemptionUtils.java | 23 +- .../capacity/FifoCandidatesSelector.java | 45 ++-- .../IntraQueueCandidatesSelector.java | 9 +- .../PreemptableResourceCalculator.java | 7 +- .../PreemptionCandidatesSelector.java | 11 + .../ProportionalCapacityPreemptionPolicy.java | 127 ++++++--- ...euePriorityContainerCandidateSelector.java | 16 +- .../ReservedContainerCandidatesSelector.java | 16 +- .../capacity/TempQueuePerPartition.java | 8 +- .../CapacitySchedulerConfiguration.java | 17 ++ .../TestPreemptionForQueueWithPriorities.java | 58 ++++ ...acityPreemptionPolicyPreemptToBalance.java | 254 ++++++++++++++++++ ...stCapacitySchedulerSurgicalPreemption.java | 111 ++++++++ 15 files changed, 636 insertions(+), 89 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java index 64b36151d8..5b8360a1c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java @@ -42,6 +42,7 @@ public class AbstractPreemptableResourceCalculator { protected final ResourceCalculator rc; protected boolean isReservedPreemptionCandidatesSelector; private Resource stepFactor; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; static class TQComparator implements Comparator { private ResourceCalculator rc; @@ -83,15 +84,28 @@ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { * this will be set by different implementation of candidate * selectors, please refer to TempQueuePerPartition#offer for * details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied + * Should resources be preempted from an over-served queue when the + * requesting queues are all at or over their guarantees? + * An example is, there're 10 queues under root, guaranteed resource + * of them are all 10%. + * Assume there're two queues are using resources, queueA uses 10% + * queueB uses 90%. For all queues are guaranteed, but it's not fair + * for queueA. + * We wanna make this behavior can be configured. By default it is + * not allowed. + * */ public AbstractPreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { context = preemptionContext; rc = preemptionContext.getResourceCalculator(); this.isReservedPreemptionCandidatesSelector = isReservedPreemptionCandidatesSelector; - + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; stepFactor = Resource.newInstance(0, 0); for (ResourceInformation ri : stepFactor.getResources()) { ri.setValue(1); @@ -193,7 +207,8 @@ protected void computeFixpointAllocation(Resource totGuarant, wQavail = Resources.componentwiseMin(wQavail, unassigned); Resource wQidle = sub.offer(wQavail, rc, totGuarant, - isReservedPreemptionCandidatesSelector); + isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); Resource wQdone = Resources.subtract(wQavail, wQidle); if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java index 098acdd851..7985296fca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -70,6 +70,8 @@ TempQueuePerPartition getQueueByPartition(String queueName, float getMaxAllowableLimitForIntraQueuePreemption(); + long getDefaultMaximumKillWaitTimeout(); + @Unstable IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index 690eb0230e..ed50effee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -151,6 +151,7 @@ public static boolean tryPreemptContainerAndDeductResToObtain( Map resourceToObtainByPartitions, RMContainer rmContainer, Resource clusterResource, Map> preemptMap, + Map> curCandidates, Resource totalPreemptionAllowed, boolean conservativeDRF) { ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); @@ -218,7 +219,7 @@ public static boolean tryPreemptContainerAndDeductResToObtain( } // Add to preemptMap - addToPreemptMap(preemptMap, attemptId, rmContainer); + addToPreemptMap(preemptMap, curCandidates, attemptId, rmContainer); return true; } @@ -230,15 +231,23 @@ private static String getPartitionByNodeId( return context.getScheduler().getSchedulerNode(nodeId).getPartition(); } - private static void addToPreemptMap( + protected static void addToPreemptMap( Map> preemptMap, + Map> curCandidates, ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { - Set set = preemptMap.get(appAttemptId); - if (null == set) { - set = new HashSet<>(); - preemptMap.put(appAttemptId, set); + Set setForToPreempt = preemptMap.get(appAttemptId); + Set setForCurCandidates = curCandidates.get(appAttemptId); + if (null == setForToPreempt) { + setForToPreempt = new HashSet<>(); + preemptMap.put(appAttemptId, setForToPreempt); } - set.add(containerToPreempt); + setForToPreempt.add(containerToPreempt); + + if (null == setForCurCandidates) { + setForCurCandidates = new HashSet<>(); + curCandidates.put(appAttemptId, setForCurCandidates); + } + setForCurCandidates.add(containerToPreempt); } private static boolean preemptMapContains( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java index 3b2fcbb90d..c2735f15f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -42,19 +43,25 @@ public class FifoCandidatesSelector private static final Log LOG = LogFactory.getLog(FifoCandidatesSelector.class); private PreemptableResourceCalculator preemptableAmountCalculator; + private boolean allowQueuesBalanceAfterAllQueuesSatisfied; FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext, - boolean includeReservedResource) { + boolean includeReservedResource, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { super(preemptionContext); + this.allowQueuesBalanceAfterAllQueuesSatisfied = + allowQueuesBalanceAfterAllQueuesSatisfied; preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, includeReservedResource); + preemptionContext, includeReservedResource, + allowQueuesBalanceAfterAllQueuesSatisfied); } @Override public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptionAllowed) { + Map> curCandidates = new HashMap<>(); // Calculate how much resources we need to preempt preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptionAllowed); @@ -110,7 +117,7 @@ public Map> selectCandidates( boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, - clusterResource, selectedCandidates, + clusterResource, selectedCandidates, curCandidates, totalPreemptionAllowed, false); if (!preempted) { continue; @@ -134,7 +141,7 @@ public Map> selectCandidates( preemptFrom(fc, clusterResource, resToObtainByPartition, skippedAMContainerlist, skippedAMSize, selectedCandidates, - totalPreemptionAllowed); + curCandidates, totalPreemptionAllowed); } // Can try preempting AMContainers (still saving atmost @@ -145,15 +152,15 @@ public Map> selectCandidates( leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL), leafQueue.getMaxAMResourcePerQueuePercent()); - preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist, - resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, - totalPreemptionAllowed); + preemptAMContainers(clusterResource, selectedCandidates, curCandidates, + skippedAMContainerlist, resToObtainByPartition, skippedAMSize, + maxAMCapacityForThisQueue, totalPreemptionAllowed); } finally { leafQueue.getReadLock().unlock(); } } - return selectedCandidates; + return curCandidates; } /** @@ -169,6 +176,7 @@ public Map> selectCandidates( */ private void preemptAMContainers(Resource clusterResource, Map> preemptMap, + Map> curCandidates, List skippedAMContainerlist, Map resToObtainByPartition, Resource skippedAMSize, Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) { @@ -187,7 +195,7 @@ private void preemptAMContainers(Resource clusterResource, boolean preempted = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, preemptMap, - totalPreemptionAllowed, false); + curCandidates, totalPreemptionAllowed, false); if (preempted) { Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); } @@ -203,6 +211,7 @@ private void preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Map resToObtainByPartition, List skippedAMContainerlist, Resource skippedAMSize, Map> selectedContainers, + Map> curCandidates, Resource totalPreemptionAllowed) { ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -219,9 +228,10 @@ private void preemptFrom(FiCaSchedulerApp app, } // Try to preempt this container - CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( - rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed, false); + CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, selectedContainers, + curCandidates, totalPreemptionAllowed, false); if (!preemptionContext.isObserveOnly()) { preemptionContext.getRMContext().getDispatcher().getEventHandler() @@ -262,9 +272,14 @@ private void preemptFrom(FiCaSchedulerApp app, } // Try to preempt this container - CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain( - rc, preemptionContext, resToObtainByPartition, c, clusterResource, - selectedContainers, totalPreemptionAllowed, false); + CapacitySchedulerPreemptionUtils + .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, + resToObtainByPartition, c, clusterResource, selectedContainers, + curCandidates, totalPreemptionAllowed, false); } } + + public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() { + return allowQueuesBalanceAfterAllQueuesSatisfied; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java index 8ab9507adf..c52fd957c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java @@ -122,7 +122,7 @@ public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) { public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { - + Map> curCandidates = new HashMap<>(); // 1. Calculate the abnormality within each queue one by one. computeIntraQueuePreemptionDemand( clusterResource, totalPreemptedResourceAllowed, selectedCandidates); @@ -182,7 +182,7 @@ public Map> selectCandidates( leafQueue.getReadLock().lock(); for (FiCaSchedulerApp app : apps) { preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates, - clusterResource, totalPreemptedResourceAllowed, + curCandidates, clusterResource, totalPreemptedResourceAllowed, resToObtainByPartition, rollingResourceUsagePerUser); } } finally { @@ -191,7 +191,7 @@ public Map> selectCandidates( } } - return selectedCandidates; + return curCandidates; } private void initializeUsageAndUserLimitForCompute(Resource clusterResource, @@ -211,6 +211,7 @@ private void initializeUsageAndUserLimitForCompute(Resource clusterResource, private void preemptFromLeastStarvedApp(LeafQueue leafQueue, FiCaSchedulerApp app, Map> selectedCandidates, + Map> curCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed, Map resToObtainByPartition, Map rollingResourceUsagePerUser) { @@ -270,7 +271,7 @@ private void preemptFromLeastStarvedApp(LeafQueue leafQueue, boolean ret = CapacitySchedulerPreemptionUtils .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext, resToObtainByPartition, c, clusterResource, selectedCandidates, - totalPreemptedResourceAllowed, true); + curCandidates, totalPreemptedResourceAllowed, true); // Subtract from respective user's resource usage once a container is // selected for preemption. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java index 08d834ea00..89a015e412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -48,11 +48,14 @@ public class PreemptableResourceCalculator * @param isReservedPreemptionCandidatesSelector this will be set by * different implementation of candidate selectors, please refer to * TempQueuePerPartition#offer for details. + * @param allowQueuesBalanceAfterAllQueuesSatisfied */ public PreemptableResourceCalculator( CapacitySchedulerPreemptionContext preemptionContext, - boolean isReservedPreemptionCandidatesSelector) { - super(preemptionContext, isReservedPreemptionCandidatesSelector); + boolean isReservedPreemptionCandidatesSelector, + boolean allowQueuesBalanceAfterAllQueuesSatisfied) { + super(preemptionContext, isReservedPreemptionCandidatesSelector, + allowQueuesBalanceAfterAllQueuesSatisfied); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java index 4d8afaf0c9..3c97364ec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -34,6 +34,7 @@ public abstract class PreemptionCandidatesSelector { protected CapacitySchedulerPreemptionContext preemptionContext; protected ResourceCalculator rc; + private long maximumKillWaitTime = -1; PreemptionCandidatesSelector( CapacitySchedulerPreemptionContext preemptionContext) { @@ -77,4 +78,14 @@ public int compare(RMContainer a, RMContainer b) { }); } + public long getMaximumKillWaitTimeMs() { + if (maximumKillWaitTime > 0) { + return maximumKillWaitTime; + } + return preemptionContext.getDefaultMaximumKillWaitTimeout(); + } + + public void setMaximumKillWaitTime(long maximumKillWaitTime) { + this.maximumKillWaitTime = maximumKillWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index cc69fbae36..036fd2f314 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -131,6 +131,8 @@ public enum IntraQueuePreemptionOrderPolicy { private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; + Map>> pcsMap; // Preemptable Entities, synced from scheduler at every run private Map preemptableQueues; @@ -249,7 +251,21 @@ private void updateConfigIfNeeded() { // initialize candidates preemption selection policies candidatesSelectionPolicies.add(new FifoCandidatesSelector(this, - additionalPreemptionBasedOnReservedResource)); + additionalPreemptionBasedOnReservedResource, false)); + + // Do we need to do preemption to balance queue even after queues get satisfied? + boolean isPreemptionToBalanceRequired = config.getBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED); + long maximumKillWaitTimeForPreemptionToQueueBalance = config.getLong( + CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION, + CapacitySchedulerConfiguration.DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION); + if (isPreemptionToBalanceRequired) { + PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this, + false, true); + selector.setMaximumKillWaitTime(maximumKillWaitTimeForPreemptionToQueueBalance); + candidatesSelectionPolicies.add(selector); + } // Do we need to specially consider intra queue boolean isIntraQueuePreemptionEnabled = config.getBoolean( @@ -282,7 +298,8 @@ private void updateConfigIfNeeded() { "select_based_on_reserved_containers = " + selectCandidatesForResevedContainers + "\n" + "additional_res_balance_based_on_reserved_containers = " + - additionalPreemptionBasedOnReservedResource); + additionalPreemptionBasedOnReservedResource + "\n" + + "Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired); csConfig = config; } @@ -308,44 +325,60 @@ public synchronized void editSchedule() { } private void preemptOrkillSelectedContainerAfterWait( - Map> selectedCandidates, - long currentTime) { + Map>> toPreemptPerSelector, long currentTime) { + int toPreemptCount = 0; + for (Map> containers : + toPreemptPerSelector.values()) { + toPreemptCount += containers.size(); + } if (LOG.isDebugEnabled()) { LOG.debug( "Starting to preempt containers for selectedCandidates and size:" - + selectedCandidates.size()); + + toPreemptCount); } // preempt (or kill) the selected containers - for (Map.Entry> e : selectedCandidates + // We need toPreemptPerSelector here to match list of containers to + // its selector so that we can get custom timeout per selector when + // checking if current container should be killed or not + for (Map.Entry>> pc : toPreemptPerSelector .entrySet()) { - ApplicationAttemptId appAttemptId = e.getKey(); - if (LOG.isDebugEnabled()) { - LOG.debug("Send to scheduler: in app=" + appAttemptId - + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); - } - for (RMContainer container : e.getValue()) { - // if we tried to preempt this for more than maxWaitTime - if (preemptionCandidates.get(container) != null - && preemptionCandidates.get(container) - + maxWaitTime <= currentTime) { - // kill it - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); - preemptionCandidates.remove(container); - } else { - if (preemptionCandidates.get(container) != null) { - // We already updated the information to scheduler earlier, we need - // not have to raise another event. - continue; + Map> cMap = pc.getValue(); + if (cMap.size() > 0) { + for (Map.Entry> e : cMap.entrySet()) { + ApplicationAttemptId appAttemptId = e.getKey(); + if (LOG.isDebugEnabled()) { + LOG.debug("Send to scheduler: in app=" + appAttemptId + + " #containers-to-be-preemptionCandidates=" + e.getValue().size()); } + for (RMContainer container : e.getValue()) { + // if we tried to preempt this for more than maxWaitTime, this + // should be based on custom timeout per container per selector + if (preemptionCandidates.get(container) != null + && preemptionCandidates.get(container) + + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) { + // kill it + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE)); + preemptionCandidates.remove(container); + } else { + if (preemptionCandidates.get(container) != null) { + // We already updated the information to scheduler earlier, we need + // not have to raise another event. + continue; + } - //otherwise just send preemption events - rmContext.getDispatcher().getEventHandler().handle( - new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); - preemptionCandidates.put(container, currentTime); + //otherwise just send preemption events + rmContext.getDispatcher().getEventHandler().handle( + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); + preemptionCandidates.put(container, currentTime); + } + } } } } @@ -438,6 +471,8 @@ private void containerBasedPreemptOrKill(CSQueue root, // queue and each application Map> toPreempt = new HashMap<>(); + Map>> toPreemptPerSelector = new HashMap<>();; for (PreemptionCandidatesSelector selector : candidatesSelectionPolicies) { long startTime = 0; @@ -447,20 +482,27 @@ private void containerBasedPreemptOrKill(CSQueue root, selector.getClass().getName())); startTime = clock.getTime(); } - toPreempt = selector.selectCandidates(toPreempt, - clusterResources, totalPreemptionAllowed); + Map> curCandidates = + selector.selectCandidates(toPreempt, clusterResources, + totalPreemptionAllowed); + toPreemptPerSelector.putIfAbsent(selector, curCandidates); if (LOG.isDebugEnabled()) { LOG.debug(MessageFormat .format("{0} uses {1} millisecond to run", selector.getClass().getName(), clock.getTime() - startTime)); int totalSelected = 0; + int curSelected = 0; for (Set set : toPreempt.values()) { totalSelected += set.size(); } + for (Set set : curCandidates.values()) { + curSelected += set.size(); + } LOG.debug(MessageFormat - .format("So far, total {0} containers selected to be preempted", - totalSelected)); + .format("So far, total {0} containers selected to be preempted, {1}" + + " containers selected this round\n", + totalSelected, curSelected)); } } @@ -483,8 +525,10 @@ private void containerBasedPreemptOrKill(CSQueue root, long currentTime = clock.getTime(); + pcsMap = toPreemptPerSelector; + // preempt (or kill) the selected containers - preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime); + preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime); // cleanup staled preemption candidates cleanupStaledPreemptionCandidates(currentTime); @@ -689,6 +733,12 @@ Map> getQueuePartitions() { return queueToPartitions; } + @VisibleForTesting + Map>> getToPreemptCandidatesPerSelector() { + return pcsMap; + } + @Override public int getClusterMaxApplicationPriority() { return scheduler.getMaxClusterLevelAppPriority().getPriority(); @@ -730,4 +780,9 @@ public void addPartitionToUnderServedQueues(String queueName, public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() { return intraQueuePreemptionOrderPolicy; } + + @Override + public long getDefaultMaximumKillWaitTimeout() { + return maxWaitTime; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java index 4a169af1a5..78a99881d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java @@ -380,6 +380,7 @@ public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { + Map> curCandidates = new HashMap<>(); // Initialize digraph from queues // TODO (wangda): only do this when queue refreshed. priorityDigraph.clear(); @@ -388,7 +389,7 @@ public Map> selectCandidates( // When all queues are set to same priority, or priority is not respected, // direct return. if (priorityDigraph.isEmpty()) { - return selectedCandidates; + return curCandidates; } // Save parameters to be shared by other methods @@ -478,13 +479,9 @@ public Map> selectCandidates( .getReservedResource()); } - Set containers = selectedCandidates.get( - c.getApplicationAttemptId()); - if (null == containers) { - containers = new HashSet<>(); - selectedCandidates.put(c.getApplicationAttemptId(), containers); - } - containers.add(c); + // Add to preemptMap + CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates, + curCandidates, c.getApplicationAttemptId(), c); // Update totalPreemptionResourceAllowed Resources.subtractFrom(totalPreemptedResourceAllowed, @@ -504,7 +501,6 @@ public Map> selectCandidates( } } } - - return selectedCandidates; + return curCandidates; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java index ff100d9a6e..bdb7e8c4b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java @@ -31,7 +31,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -63,7 +62,7 @@ public NodeForPreemption(float preemptionCost, CapacitySchedulerPreemptionContext preemptionContext) { super(preemptionContext); preemptableAmountCalculator = new PreemptableResourceCalculator( - preemptionContext, true); + preemptionContext, true, false); } @Override @@ -71,6 +70,7 @@ public Map> selectCandidates( Map> selectedCandidates, Resource clusterResource, Resource totalPreemptedResourceAllowed) { + Map> curCandidates = new HashMap<>(); // Calculate how much resources we need to preempt preemptableAmountCalculator.computeIdealAllocation(clusterResource, totalPreemptedResourceAllowed); @@ -101,14 +101,10 @@ public Map> selectCandidates( selectedCandidates, totalPreemptedResourceAllowed, false); if (null != preemptionResult) { for (RMContainer c : preemptionResult.selectedContainers) { - ApplicationAttemptId appId = c.getApplicationAttemptId(); - Set containers = selectedCandidates.get(appId); - if (null == containers) { - containers = new HashSet<>(); - selectedCandidates.put(appId, containers); - } + // Add to preemptMap + CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates, + curCandidates, c.getApplicationAttemptId(), c); - containers.add(c); if (LOG.isDebugEnabled()) { LOG.debug(this.getClass().getName() + " Marked container=" + c .getContainerId() + " from queue=" + c.getQueueName() @@ -118,7 +114,7 @@ public Map> selectCandidates( } } - return selectedCandidates; + return curCandidates; } private Resource getPreemptableResource(String queueName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java index 4214acc552..4fb1862b88 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java @@ -138,7 +138,8 @@ public ArrayList getChildren() { // This function "accepts" all the resources it can (pending) and return // the unused ones Resource offer(Resource avail, ResourceCalculator rc, - Resource clusterResource, boolean considersReservedResource) { + Resource clusterResource, boolean considersReservedResource, + boolean allowQueueBalanceAfterAllSafisfied) { Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax( Resources.subtract(getMax(), idealAssigned), Resource.newInstance(0, 0)); @@ -179,7 +180,10 @@ Resource offer(Resource avail, ResourceCalculator rc, // leaf queues. Such under-utilized leaf queue could preemption resources // from over-utilized leaf queue located at other hierarchies. - accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + // Allow queues can continue grow and balance even if all queues are satisfied. + if (!allowQueueBalanceAfterAllSafisfied) { + accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted); + } // accepted so far contains the "quota acceptable" amount, we now filter by // locality acceptable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 76eaac0571..f94654e280 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1459,6 +1459,23 @@ public boolean getLazyPreemptionEnabled() { + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy"; public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; + /** + * Should we allow queues continue grow after all queue reaches their + * guaranteed capacity. + */ + public static final String PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = + PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled"; + public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = false; + + /** + * How long we will wait to balance queues, by default it is 5 mins. + */ + public static final String MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION = + PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill"; + public static final long + DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION = + 300 * 1000; + /** * Maximum application for a queue to be used when application per queue is * not defined.To be consistent with previous version the default value is set diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java index 6a953cfe06..38c2a2a284 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java @@ -20,6 +20,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -538,4 +539,61 @@ public void test3ResourceTypesInterQueuePreemption() throws IOException { new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues() + throws IOException { + /** + * All queues are beyond guarantee, c has higher priority than b. + * c ask for more resource, and there is no idle left, c should preempt + * some resource from b but won’t let b under its guarantee. + * + * Queue structure is: + * + *
+     *        root
+     *       / |  \
+     *      a  b   c
+     * 
+ * + * For priorities + * - a=1 + * - b=1 + * - c=2 + * + */ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 0 0]){priority=1};" + // a + "-b(=[30 100 40 50]){priority=1};" + // b + "-c(=[40 100 60 25]){priority=2}"; // c + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "b\t(1,1,n1,,40,false);" + // app1 in b + "c\t(1,1,n1,,60,false)"; // app2 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + boolean isPreemptionToBalanceRequired = true; + newConf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + isPreemptionToBalanceRequired); + when(cs.getConfiguration()).thenReturn(newConf); + policy.editSchedule(); + + // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though + // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed), + // since c has higher priority, c will be put in mostUnderServedQueue and + // get all remain 30 capacity. + verify(mDisp, times(10)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, never()).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java new file mode 100644 index 0000000000..22e8f63951 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Test; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyPreemptToBalance + extends ProportionalCapacityPreemptionPolicyMockFramework { + + @Test + public void testPreemptionToBalanceDisabled() throws IOException { + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 10 30]);" + // a + "-b(=[30 100 40 30]);" + // b + "-c(=[30 100 50 30]);" + // c + "-d(=[10 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(30, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(35, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(35, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testPreemptionToBalanceEnabled() throws IOException { + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 10 30]);" + // a + "-b(=[30 100 40 30]);" + // b + "-c(=[30 100 50 30]);" + // c + "-d(=[10 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + // enable preempt to balance and ideal assignment will change. + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + isPreemptionToBalanceEnabled); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A + verify(mDisp, times(7)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + verify(mDisp, times(17)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(33, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(33, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(33, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + + @Test + public void testPreemptionToBalanceUsedPlusPendingLessThanGuaranteed() + throws IOException{ + String labelsConfig = "=100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100 100 100 100]);" + //root + "-a(=[30 100 10 6]);" + // a + "-b(=[30 100 40 30]);" + // b + "-c(=[30 100 50 30]);" + // c + "-d(=[10 100 0 0])"; // d + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1,n1,,10,false);" + // app1 in a + "b\t(1,1,n1,,40,false);" + // app2 in b + "c\t(1,1,n1,,50,false)"; // app3 in c + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + isPreemptionToBalanceEnabled); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A + verify(mDisp, times(8)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(3)))); + + assertEquals(16, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(42, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(42, policy.getQueuePartitions().get("c") + .get("").getIdealAssigned().getMemorySize()); + } + + @Test + public void testPreemptionToBalanceWithVcoreResource() throws IOException { + Logger.getRootLogger().setLevel(Level.DEBUG); + String labelsConfig = "=100:100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:100 100:100 100:100 120:140]);" + //root + "-a(=[60:60 100:100 40:40 70:40]);" + // a + "-b(=[40:40 100:100 60:60 50:100])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1:1,n1,,40,false);" + // app1 in a + "b\t(1,1:1,n1,,60,false)"; // app2 in b + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + isPreemptionToBalanceEnabled); + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true); + policy.editSchedule(); + + // 21 containers will be preempted here + verify(mDisp, times(21)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy. + IsPreemptionRequestFor(getAppAttemptId(2)))); + + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getVirtualCores()); + } + + @Test + public void testPreemptionToBalanceWithConfiguredTimeout() throws IOException { + Logger.getRootLogger().setLevel(Level.DEBUG); + String labelsConfig = "=100:100,true"; // default partition + String nodesConfig = "n1="; // only one node + String queuesConfig = + // guaranteed,max,used,pending + "root(=[100:100 100:100 100:100 120:140]);" + //root + "-a(=[60:60 100:100 40:40 70:40]);" + // a + "-b(=[40:40 100:100 60:60 50:100])"; // b + + String appsConfig = + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t(1,1:1,n1,,40,false);" + // app1 in a + "b\t(1,1:1,n1,,60,false)"; // app2 in b + + boolean isPreemptionToBalanceEnabled = true; + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + isPreemptionToBalanceEnabled); + final long FB_MAX_BEFORE_KILL = 60 *1000; + conf.setLong( + CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION, + FB_MAX_BEFORE_KILL); + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true); + policy.editSchedule(); + + Map>> pcps= policy.getToPreemptCandidatesPerSelector(); + + String FIFO_CANDIDATE_SELECTOR = "FifoCandidatesSelector"; + boolean hasFifoSelector = false; + for (Map.Entry>> pc : pcps.entrySet()) { + if (pc.getKey().getClass().getSimpleName().equals(FIFO_CANDIDATE_SELECTOR)) { + FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey(); + if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) { + hasFifoSelector = true; + assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL); + } + } + } + + assertEquals(hasFifoSelector, true); + + // 21 containers will be preempted here + verify(mDisp, times(21)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy. + IsPreemptionRequestFor(getAppAttemptId(2)))); + + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(60, policy.getQueuePartitions().get("a") + .get("").getIdealAssigned().getVirtualCores()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getMemorySize()); + assertEquals(40, policy.getQueuePartitions().get("b") + .get("").getIdealAssigned().getVirtualCores()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 2aff82d090..800789af72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -1111,5 +1111,116 @@ public void testPreemptionForFragmentatedCluster() throws Exception { rm1.close(); } + @Test(timeout = 600000) + public void testPreemptionToBalanceWithCustomTimeout() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-b, asks for 1G * 5 + * + * 3) app2 submit to queue-c, ask for one 4G container (for AM) + * + * After preemption, we should expect: + * 1. Preempt 4 containers from app1 + * 2. the selected containers will be killed after configured timeout. + * 3. AM of app2 successfully allocated. + */ + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED, + true); + conf.setLong( + CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION, + 20*1000); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration( + this.conf); + + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 38, new ArrayList()); + + // Do allocation for node1/node2 + for (int i = 0; i < 38; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 39 containers now + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(39, schedulerApp1.getLiveContainers().size()); + // 20 from n1 and 19 from n2 + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()), + am1.getApplicationAttemptId(), 20); + waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()), + am1.getApplicationAttemptId(), 19); + + + // Submit app2 to queue-c and asks for a 4G container for AM + RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c"); + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); + + // Call editSchedule: containers are selected to be preemption candidate + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); + editPolicy.editSchedule(); + Assert.assertEquals(4, editPolicy.getToPreemptContainers().size()); + + // check live containers immediately, nothing happen + Assert.assertEquals(39, schedulerApp1.getLiveContainers().size()); + + Thread.sleep(20*1000); + // Call editSchedule again: selected containers are killed + editPolicy.editSchedule(); + waitNumberOfLiveContainersFromApp(schedulerApp1, 35); + + // Call allocation, containers are reserved + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + waitNumberOfReservedContainersFromApp(schedulerApp2, 1); + + // Call editSchedule twice and allocation once, container should get allocated + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + int tick = 0; + while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + tick++; + Thread.sleep(100); + } + waitNumberOfReservedContainersFromApp(schedulerApp2, 0); + + rm1.close(); + + + } + }