diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 6c0a854122..accf901eb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -400,14 +400,31 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) } } + protected Configuration loadNewConfiguration() + throws IOException, YarnException { + // Retrieve yarn-site.xml in order to refresh scheduling monitor properties. + Configuration conf = getConfiguration(new Configuration(false), + YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); + // The reason we call Configuration#size() is because when getConfiguration + // been called, it invokes Configuration#addResouce, which invokes + // Configuration#reloadConfiguration which triggers the reload process in a + // lazy way, the properties will only be reload when it's needed rather than + // reload it right after getConfiguration been called. So here we call + // Configuration#size() to force the Configuration#getProps been called to + // reload all the properties. + conf.size(); + return conf; + } + @Private public void refreshQueues() throws IOException, YarnException { - rm.getRMContext().getScheduler().reinitialize(getConfig(), + Configuration conf = loadNewConfiguration(); + rm.getRMContext().getScheduler().reinitialize(conf, this.rm.getRMContext()); // refresh the reservation system ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); if (rSystem != null) { - rSystem.reinitialize(getConfig(), rm.getRMContext()); + rSystem.reinitialize(conf, rm.getRMContext()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 6f8a0a48ed..a0317f62cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.AuthInfo; @@ -67,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; @@ -113,8 +112,6 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.eclipse.jetty.webapp.WebAppContext; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; @@ -711,8 +708,6 @@ protected void serviceInit(Configuration configuration) throws Exception { } } - createSchedulerMonitors(); - masterService = createApplicationMasterService(); addService(masterService) ; rmContext.setApplicationMasterService(masterService); @@ -811,30 +806,6 @@ protected void serviceStop() throws Exception { } } - - protected void createSchedulerMonitors() { - if (conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, - YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) { - LOG.info("Loading policy monitors"); - List policies = conf.getInstances( - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, - SchedulingEditPolicy.class); - if (policies.size() > 0) { - for (SchedulingEditPolicy policy : policies) { - LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); - // periodically check whether we need to take action to guarantee - // constraints - SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); - addService(mon); - } - } else { - LOG.warn("Policy monitors configured (" + - YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS + - ") but none specified (" + - YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")"); - } - } - } } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 2a741ed83c..09edb98787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import com.google.common.annotations.VisibleForTesting; @@ -58,6 +57,7 @@ public synchronized SchedulingEditPolicy getSchedulingEditPolicy() { } public void serviceInit(Configuration conf) throws Exception { + LOG.info("Initializing SchedulingMonitor=" + getName()); scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler()); this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); @@ -65,6 +65,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { + LOG.info("Starting SchedulingMonitor=" + getName()); assert !stopped : "starting when already stopped"; ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { public Thread newThread(Runnable r) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java new file mode 100644 index 0000000000..0cc700d9c1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitorManager.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Manages scheduling monitors. + */ +public class SchedulingMonitorManager { + private static final Log LOG = LogFactory.getLog( + SchedulingMonitorManager.class); + + private Map runningSchedulingMonitors = + new HashMap<>(); + private RMContext rmContext; + + private void updateSchedulingMonitors(Configuration conf, + boolean startImmediately) throws YarnException { + boolean monitorsEnabled = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + + if (!monitorsEnabled) { + if (!runningSchedulingMonitors.isEmpty()) { + // If monitors disabled while we have some running monitors, we should + // stop them. + LOG.info("Scheduling Monitor disabled, stopping all services"); + stopAndRemoveAll(); + } + + return; + } + + // When monitor is enabled, loading policies + String[] configuredPolicies = conf.getStrings( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES); + if (configuredPolicies == null || configuredPolicies.length == 0) { + return; + } + + Set configurePoliciesSet = new HashSet<>(); + for (String s : configuredPolicies) { + configurePoliciesSet.add(s); + } + + // Add new monitor when needed + for (String s : configurePoliciesSet) { + if (!runningSchedulingMonitors.containsKey(s)) { + Class policyClass; + try { + policyClass = Class.forName(s); + } catch (ClassNotFoundException e) { + String message = "Failed to find class of specified policy=" + s; + LOG.warn(message); + throw new YarnException(message); + } + + if (SchedulingEditPolicy.class.isAssignableFrom(policyClass)) { + SchedulingEditPolicy policyInstance = + (SchedulingEditPolicy) ReflectionUtils.newInstance(policyClass, + null); + SchedulingMonitor mon = new SchedulingMonitor(rmContext, + policyInstance); + mon.init(conf); + if (startImmediately) { + mon.start(); + } + runningSchedulingMonitors.put(s, mon); + } else { + String message = + "Specified policy=" + s + " is not a SchedulingEditPolicy class."; + LOG.warn(message); + throw new YarnException(message); + } + } + } + + // Stop monitor when needed. + Set disabledPolicies = Sets.difference( + runningSchedulingMonitors.keySet(), configurePoliciesSet); + for (String disabledPolicy : disabledPolicies) { + LOG.info("SchedulingEditPolicy=" + disabledPolicy + + " removed, stopping it now ..."); + silentlyStopSchedulingMonitor(disabledPolicy); + runningSchedulingMonitors.remove(disabledPolicy); + } + } + + public synchronized void initialize(RMContext rmContext, + Configuration configuration) throws YarnException { + this.rmContext = rmContext; + stopAndRemoveAll(); + + updateSchedulingMonitors(configuration, false); + } + + public synchronized void reinitialize(RMContext rmContext, + Configuration configuration) throws YarnException { + this.rmContext = rmContext; + + updateSchedulingMonitors(configuration, true); + } + + public synchronized void startAll() { + for (SchedulingMonitor schedulingMonitor : runningSchedulingMonitors + .values()) { + schedulingMonitor.start(); + } + } + + private void silentlyStopSchedulingMonitor(String name) { + SchedulingMonitor mon = runningSchedulingMonitors.get(name); + try { + mon.stop(); + LOG.info("Sucessfully stopped monitor=" + mon.getName()); + } catch (Exception e) { + LOG.warn("Exception while stopping monitor=" + mon.getName(), e); + } + } + + private void stopAndRemoveAll() { + if (!runningSchedulingMonitors.isEmpty()) { + for (String schedulingMonitorName : runningSchedulingMonitors + .keySet()) { + silentlyStopSchedulingMonitor(schedulingMonitorName); + } + runningSchedulingMonitors.clear(); + } + } + + public boolean isRSMEmpty() { + return runningSchedulingMonitors.isEmpty(); + } + + public boolean isSameConfiguredPolicies(Set configurePoliciesSet) { + return configurePoliciesSet.equals(runningSchedulingMonitors.keySet()); + } + + public SchedulingMonitor getAvailableSchedulingMonitor() { + if (isRSMEmpty()) { + return null; + } + for (SchedulingMonitor smon : runningSchedulingMonitors.values()) { + if (smon.getSchedulingEditPolicy() + instanceof ProportionalCapacityPreemptionPolicy) { + return smon; + } + } + return null; + } + + public synchronized void stop() throws YarnException { + stopAndRemoveAll(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index e818dabdef..4749c3d899 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -168,6 +169,8 @@ public abstract class AbstractYarnScheduler // the NM in the next heartbeat. private boolean autoUpdateContainers = false; + protected SchedulingMonitorManager schedulingMonitorManager; + /** * Construct the service. * @@ -207,8 +210,8 @@ public void serviceInit(Configuration conf) throws Exception { new RMCriticalThreadUncaughtExceptionHandler(rmContext)); updateThread.setDaemon(true); } - super.serviceInit(conf); + } @Override @@ -216,6 +219,7 @@ protected void serviceStart() throws Exception { if (updateThread != null) { updateThread.start(); } + schedulingMonitorManager.startAll(); super.serviceStart(); } @@ -225,6 +229,9 @@ protected void serviceStop() throws Exception { updateThread.interrupt(); updateThread.join(THREAD_JOIN_TIMEOUT_MS); } + if (schedulingMonitorManager != null) { + schedulingMonitorManager.stop(); + } super.serviceStop(); } @@ -233,6 +240,11 @@ public ClusterNodeTracker getNodeTracker() { return nodeTracker; } + @VisibleForTesting + public SchedulingMonitorManager getSchedulingMonitorManager() { + return schedulingMonitorManager; + } + /* * YARN-3136 removed synchronized lock for this method for performance * purposes @@ -1415,4 +1427,15 @@ protected void triggerUpdate() { updateThreadMonitor.notify(); } } + + @Override + public void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + try { + LOG.info("Reinitializing SchedulingMonitorManager ..."); + schedulingMonitorManager.reinitialize(rmContext, conf); + } catch (YarnException e) { + throw new IOException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 218adf3d68..de93a6a248 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; @@ -390,6 +391,9 @@ public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); super.serviceInit(conf); initScheduler(configuration); + // Initialize SchedulingMonitorManager + schedulingMonitorManager = new SchedulingMonitorManager(); + schedulingMonitorManager.initialize(rmContext, conf); } @Override @@ -444,6 +448,8 @@ public void reinitialize(Configuration newConf, RMContext rmContext) // Setup how many containers we can allocate for each round offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + + super.reinitialize(newConf, rmContext); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 625009d697..ebc7222ccf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -1352,6 +1353,10 @@ private void startSchedulerThreads() { public void serviceInit(Configuration conf) throws Exception { initScheduler(conf); super.serviceInit(conf); + + // Initialize SchedulingMonitorManager + schedulingMonitorManager = new SchedulingMonitorManager(); + schedulingMonitorManager.initialize(rmContext, conf); } @Override @@ -1389,6 +1394,7 @@ public void reinitialize(Configuration conf, RMContext rmContext) throws IOException { try { allocsLoader.reloadAllocations(); + super.reinitialize(conf, rmContext); } catch (Exception e) { LOG.error("Failed to reload allocations file", e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 3288912836..826575d31d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -255,6 +256,10 @@ private synchronized void initScheduler(Configuration conf) { public void serviceInit(Configuration conf) throws Exception { initScheduler(conf); super.serviceInit(conf); + + // Initialize SchedulingMonitorManager + schedulingMonitorManager = new SchedulingMonitorManager(); + schedulingMonitorManager.initialize(rmContext, conf); } @Override @@ -312,6 +317,7 @@ public synchronized void setRMContext(RMContext rmContext) { reinitialize(Configuration conf, RMContext rmContext) throws IOException { setConf(conf); + super.reinitialize(conf, rmContext); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 4ac4fc306b..439a449094 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -105,9 +105,35 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm) return am; } + private MockRM initMockRMWithOldConf(Configuration confForRM1) { + return new MockRM(confForRM1, null, false, false) { + @Override + protected AdminService createAdminService() { + return new AdminService(this) { + @Override + protected void startServer() { + // override to not start rpc handler + } + + @Override + protected void stopServer() { + // don't do anything + } + + @Override + protected Configuration loadNewConfiguration() + throws IOException, YarnException { + return confForRM1; + } + }; + } + }; + } + protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1, null, false, false); - rm2 = new MockRM(confForRM2, null, false, false); + rm1 = initMockRMWithOldConf(confForRM1); + rm2 = initMockRMWithOldConf(confForRM2); + startRMs(rm1, confForRM1, rm2, confForRM2); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java index c38236d0ab..84126c7287 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/TestSchedulingMonitor.java @@ -23,8 +23,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -51,4 +58,38 @@ public void testRMStarts() throws Exception { monitor.close(); rm.close(); } + + @Test(timeout = 10000) + public void testRMUpdateSchedulingEditPolicy() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulingMonitorManager smm = cs.getSchedulingMonitorManager(); + + // runningSchedulingMonitors should not be empty when initialize RM + // scheduler monitor + cs.reinitialize(conf, rm.getRMContext()); + assertFalse(smm.isRSMEmpty()); + + // make sure runningSchedulingPolicies contains all the configured policy + // in YARNConfiguration + String[] configuredPolicies = conf.getStrings( + YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES); + Set configurePoliciesSet = new HashSet<>(); + for (String s : configuredPolicies) { + configurePoliciesSet.add(s); + } + assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet)); + + // disable RM scheduler monitor + conf.setBoolean( + YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS); + cs.reinitialize(conf, rm.getRMContext()); + assertTrue(smm.isRSMEmpty()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 694be09ee8..f0ca466d5c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; @@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.util.Clock; @@ -792,21 +792,23 @@ public void testPolicyInitializeAfterSchedulerInitialized() { @SuppressWarnings("resource") MockRM rm = new MockRM(conf); rm.init(conf); - + // ProportionalCapacityPreemptionPolicy should be initialized after // CapacityScheduler initialized. We will // 1) find SchedulingMonitor from RMActiveService's service list, // 2) check if ResourceCalculator in policy is null or not. // If it's not null, we can come to a conclusion that policy initialized // after scheduler got initialized - for (Service service : rm.getRMActiveService().getServices()) { - if (service instanceof SchedulingMonitor) { - ProportionalCapacityPreemptionPolicy policy = - (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service) - .getSchedulingEditPolicy(); - assertNotNull(policy.getResourceCalculator()); - return; - } + // Get SchedulingMonitor from SchedulingMonitorManager instead + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + SchedulingMonitorManager smm = cs.getSchedulingMonitorManager(); + Service service = smm.getAvailableSchedulingMonitor(); + if (service instanceof SchedulingMonitor) { + ProportionalCapacityPreemptionPolicy policy = + (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service) + .getSchedulingEditPolicy(); + assertNotNull(policy.getResourceCalculator()); + return; } fail("Failed to find SchedulingMonitor service, please check what happened"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java index 4e4e3c2064..a4c7d61fff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -126,7 +128,11 @@ public void testSimplePreemption() throws Exception { Resources.createResource(1 * GB), 1)), null); // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if one container from app1 marked // to be "killable" @@ -209,7 +215,11 @@ public void testPreemptionConsidersNodeLocalityDelay() Resources.createResource(1 * GB), 1)), null); // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if one container from app1 marked // to be "killable" @@ -301,7 +311,11 @@ public void testPreemptionConsidersHardNodeLocality() Resources.createResource(1 * GB), 1, false)), null); // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if one container from app1 marked // to be "killable" @@ -387,8 +401,11 @@ public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() am2.allocate("*", 1 * GB, 1, new ArrayList()); // Get edit policy and do one update + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if one container from app1 marked // to be "killable" @@ -487,8 +504,11 @@ public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded( am2.allocate("*", 3 * GB, 1, new ArrayList()); // Get edit policy and do one update + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if 3 container from app1 marked // to be "killable" @@ -582,7 +602,11 @@ public void testPreemptionConsidersUserLimit() Resources.createResource(1 * GB), 1)), null); // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if no container from app1 marked // to be "killable" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 9146373d10..8a7e03f856 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -26,7 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; -import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -138,7 +139,11 @@ public void testSimpleSurgicalPreemption() Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); // Get edit policy and do one update - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); // Call edit schedule twice, and check if 4 containers from app1 at n1 killed editPolicy.editSchedule(); @@ -217,8 +222,11 @@ public void testSurgicalPreemptionWithAvailableResource() ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); // Call editSchedule: containers are selected to be preemption candidate + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); Assert.assertEquals(3, editPolicy.getToPreemptContainers().size()); @@ -323,8 +331,11 @@ public void testPriorityPreemptionWhenAllQueuesAreBelowGuaranteedCapacities() } // Call editSchedule immediately: containers are not selected + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); @@ -434,8 +445,11 @@ public void testPriorityPreemptionRequiresMoveReservation() cs.getNode(rmNode3.getNodeID()).getReservedContainer()); // Call editSchedule immediately: nothing happens + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); Assert.assertNotNull( cs.getNode(rmNode3.getNodeID()).getReservedContainer()); @@ -562,8 +576,11 @@ public void testPriorityPreemptionOnlyTriggeredWhenDemandingQueueUnsatisfied() // 6 (selected) + 1 (allocated) which makes target capacity to 70% Thread.sleep(1000); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, am1.getApplicationAttemptId()); @@ -715,8 +732,11 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() Thread.sleep(1000); /* 1st container preempted is on n2 */ + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); ProportionalCapacityPreemptionPolicy editPolicy = - (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); // We should have one to-preempt container, on node[2] @@ -887,7 +907,11 @@ public void testPreemptionForFragmentatedCluster() throws Exception { waitNumberOfReservedContainersFromApp(schedulerApp2, 1); // Call editSchedule twice and allocation once, container should get allocated - SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + SchedulingMonitorManager smm = ((CapacityScheduler) rm1. + getResourceScheduler()).getSchedulingMonitorManager(); + SchedulingMonitor smon = smm.getAvailableSchedulingMonitor(); + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy(); editPolicy.editSchedule(); editPolicy.editSchedule();