From e6ec02001fc4eed9eb51c8653d8f931135e49eda Mon Sep 17 00:00:00 2001 From: Eric Payne Date: Thu, 2 Nov 2017 12:37:33 -0500 Subject: [PATCH] =?UTF-8?q?YARN-7370:=20Preemption=20properties=20should?= =?UTF-8?q?=20be=20refreshable.=20Contrubted=20by=20Gergely=20Nov=C3=A1k.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../monitor/SchedulingMonitor.java | 15 +++- .../ProportionalCapacityPreemptionPolicy.java | 79 ++++++++++++++----- .../CapacitySchedulerConfiguration.java | 11 +-- ...tProportionalCapacityPreemptionPolicy.java | 42 ++++++++-- .../TestCapacitySchedulerLazyPreemption.java | 2 +- 5 files changed, 115 insertions(+), 34 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 631d1a0f83..2a741ed83c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -73,9 +73,13 @@ public Thread newThread(Runnable r) { return t; } }); + schedulePreemptionChecker(); + super.serviceStart(); + } + + private void schedulePreemptionChecker() { handler = ses.scheduleAtFixedRate(new PreemptionChecker(), 0, monitorInterval, TimeUnit.MILLISECONDS); - super.serviceStart(); } @Override @@ -98,8 +102,13 @@ private class PreemptionChecker implements Runnable { @Override public void run() { try { - //invoke the preemption policy - invokePolicy(); + if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) { + handler.cancel(true); + monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + schedulePreemptionChecker(); + } else { + invokePolicy(); + } } catch (Throwable t) { // The preemption monitor does not alter structures nor do structures // persist across invocations. Therefore, log, skip, and retry. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index c4c98e2302..2c072d2544 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -108,6 +108,9 @@ public enum IntraQueuePreemptionOrderPolicy { private float minimumThresholdForIntraQueuePreemption; private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; + // Current configuration + private CapacitySchedulerConfiguration csConfig; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -121,8 +124,7 @@ public enum IntraQueuePreemptionOrderPolicy { new HashMap<>(); private Map> partitionToUnderServedQueues = new HashMap>(); - private List - candidatesSelectionPolicies = new ArrayList<>(); + private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; @@ -160,68 +162,76 @@ public void init(Configuration config, RMContext context, } rmContext = context; scheduler = (CapacityScheduler) sched; - CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + updateConfigIfNeeded(); + } - maxIgnoredOverCapacity = csConfig.getDouble( + private void updateConfigIfNeeded() { + CapacitySchedulerConfiguration config = scheduler.getConfiguration(); + if (config == csConfig) { + return; + } + + maxIgnoredOverCapacity = config.getDouble( CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY); - naturalTerminationFactor = csConfig.getDouble( + naturalTerminationFactor = config.getDouble( CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR); - maxWaitTime = csConfig.getLong( + maxWaitTime = config.getLong( CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL); - monitoringInterval = csConfig.getLong( + monitoringInterval = config.getLong( CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL); - percentageClusterPreemptionAllowed = csConfig.getFloat( + percentageClusterPreemptionAllowed = config.getFloat( CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND); - observeOnly = csConfig.getBoolean( + observeOnly = config.getBoolean( CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY); - lazyPreempionEnabled = csConfig.getBoolean( - CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + lazyPreempionEnabled = config.getBoolean( + CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); - maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( + maxAllowableLimitForIntraQueuePreemption = config.getFloat( CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, CapacitySchedulerConfiguration. DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); - minimumThresholdForIntraQueuePreemption = csConfig.getFloat( + minimumThresholdForIntraQueuePreemption = config.getFloat( CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, CapacitySchedulerConfiguration. DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy - .valueOf(csConfig + .valueOf(config .get( CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) .toUpperCase()); - rc = scheduler.getResourceCalculator(); - nlm = scheduler.getRMContext().getNodeLabelManager(); + candidatesSelectionPolicies = new ArrayList<>(); // Do we need white queue-priority preemption policy? boolean isQueuePriorityPreemptionEnabled = - csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); + config.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); if (isQueuePriorityPreemptionEnabled) { candidatesSelectionPolicies.add( new QueuePriorityContainerCandidateSelector(this)); } // Do we need to specially consider reserved containers? - boolean selectCandidatesForResevedContainers = csConfig.getBoolean( + boolean selectCandidatesForResevedContainers = config.getBoolean( CapacitySchedulerConfiguration. PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, CapacitySchedulerConfiguration. @@ -231,7 +241,7 @@ public void init(Configuration config, RMContext context, .add(new ReservedContainerCandidatesSelector(this)); } - boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean( + boolean additionalPreemptionBasedOnReservedResource = config.getBoolean( CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS); @@ -240,12 +250,39 @@ public void init(Configuration config, RMContext context, additionalPreemptionBasedOnReservedResource)); // Do we need to specially consider intra queue - boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( + boolean isIntraQueuePreemptionEnabled = config.getBoolean( CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); if (isIntraQueuePreemptionEnabled) { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + + LOG.info("Capacity Scheduler configuration changed, updated preemption " + + "properties to:\n" + + "max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" + + "natural_termination_factor = " + naturalTerminationFactor + "\n" + + "max_wait_before_kill = " + maxWaitTime + "\n" + + "monitoring_interval = " + monitoringInterval + "\n" + + "total_preemption_per_round = " + percentageClusterPreemptionAllowed + + "\n" + + "observe_only = " + observeOnly + "\n" + + "lazy-preemption-enabled = " + lazyPreempionEnabled + "\n" + + "intra-queue-preemption.enabled = " + isIntraQueuePreemptionEnabled + + "\n" + + "intra-queue-preemption.max-allowable-limit = " + + maxAllowableLimitForIntraQueuePreemption + "\n" + + "intra-queue-preemption.minimum-threshold = " + + minimumThresholdForIntraQueuePreemption + "\n" + + "intra-queue-preemption.preemption-order-policy = " + + intraQueuePreemptionOrderPolicy + "\n" + + "priority-utilization.underutilized-preemption.enabled = " + + isQueuePriorityPreemptionEnabled + "\n" + + "select_based_on_reserved_containers = " + + selectCandidatesForResevedContainers + "\n" + + "additional_res_balance_based_on_reserved_containers = " + + additionalPreemptionBasedOnReservedResource); + + csConfig = config; } @Override @@ -255,6 +292,8 @@ public ResourceCalculator getResourceCalculator() { @Override public synchronized void editSchedule() { + updateConfigIfNeeded(); + long startTs = clock.getTime(); CSQueue root = scheduler.getRootQueue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ecf5f..bfead35934 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -296,7 +296,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur "reservation-enforcement-window"; @Private - public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; + public static final String LAZY_PREEMPTION_ENABLED = + PREFIX + "lazy-preemption-enabled"; @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; @@ -1166,7 +1167,7 @@ public void setOrderingPolicyParameter(String queue, } public boolean getLazyPreemptionEnabled() { - return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); + return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED); } private static final String PREEMPTION_CONFIG_PREFIX = @@ -1207,7 +1208,7 @@ public boolean getLazyPreemptionEnabled() { * completions) it might prevent convergence to guaranteed capacity. */ public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity"; - public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; + public static final double DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1; /** * Given a computed preemption target, account for containers naturally * expiring and preempt only this percentage of the delta. This determines @@ -1217,8 +1218,8 @@ public boolean getLazyPreemptionEnabled() { * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = PREEMPTION_CONFIG_PREFIX + "natural_termination_factor"; - public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = - 0.2f; + public static final double DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = + 0.2; /** * By default, reserved resource will be excluded while balancing capacities diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index a14a2b1368..694be09ee8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -187,9 +187,7 @@ public void setup() { appAlloc = 0; } - @Test - public void testIgnore() { - int[][] qData = new int[][]{ + private static final int[][] Q_DATA_FOR_IGNORE = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs { 100, 100, 100, 100 }, // maxCap @@ -199,8 +197,12 @@ public void testIgnore() { { 3, 1, 1, 1 }, // apps { -1, 1, 1, 1 }, // req granularity { 3, 0, 0, 0 }, // subqueues - }; - ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + }; + + @Test + public void testIgnore() { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); policy.editSchedule(); // don't correct imbalances without demand verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); @@ -1033,6 +1035,36 @@ public void testPreemptionNotHappenForSingleReservedQueue() { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } + + @Test + public void testRefreshPreemptionProperties() throws Exception { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); + + assertEquals( + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL, + policy.getMonitoringInterval()); + assertEquals( + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY, + policy.isObserveOnly()); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + long newMonitoringInterval = 5000; + boolean newObserveOnly = true; + newConf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + newMonitoringInterval); + newConf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, + newObserveOnly); + when(mCS.getConfiguration()).thenReturn(newConf); + + policy.editSchedule(); + + assertEquals(newMonitoringInterval, policy.getMonitoringInterval()); + assertEquals(newObserveOnly, policy.isObserveOnly()); + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java index e7157b8654..4e4e3c2064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java @@ -54,7 +54,7 @@ public class TestCapacitySchedulerLazyPreemption @Before public void setUp() throws Exception { super.setUp(); - conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, true); }