YARN-10675. Consolidate YARN-10672 and YARN-10447. Contributed by Szilard Nemeth.

This commit is contained in:
Peter Bacsko 2021-03-09 11:34:37 +01:00
parent e472ee2aa5
commit 20416bc62d
2 changed files with 8 additions and 19 deletions

View File

@ -249,8 +249,6 @@ public Configuration getConf() {
private CSMaxRunningAppsEnforcer maxRunningEnforcer; private CSMaxRunningAppsEnforcer maxRunningEnforcer;
private boolean activitiesManagerEnabled = true;
public CapacityScheduler() { public CapacityScheduler() {
super(CapacityScheduler.class.getName()); super(CapacityScheduler.class.getName());
this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this); this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
@ -351,9 +349,7 @@ void initScheduler(Configuration configuration) throws
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager(); this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
this.activitiesManager = new ActivitiesManager(rmContext); this.activitiesManager = new ActivitiesManager(rmContext);
if (activitiesManagerEnabled) {
activitiesManager.init(conf); activitiesManager.init(conf);
}
initializeQueues(this.conf); initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
@ -411,9 +407,7 @@ void initScheduler(Configuration configuration) throws
private void startSchedulerThreads() { private void startSchedulerThreads() {
writeLock.lock(); writeLock.lock();
try { try {
if (activitiesManagerEnabled) {
activitiesManager.start(); activitiesManager.start();
}
if (scheduleAsynchronously) { if (scheduleAsynchronously) {
Preconditions.checkNotNull(asyncSchedulerThreads, Preconditions.checkNotNull(asyncSchedulerThreads,
"asyncSchedulerThreads is null"); "asyncSchedulerThreads is null");
@ -447,9 +441,7 @@ public void serviceStart() throws Exception {
public void serviceStop() throws Exception { public void serviceStop() throws Exception {
writeLock.lock(); writeLock.lock();
try { try {
if (activitiesManagerEnabled) {
this.activitiesManager.stop(); this.activitiesManager.stop();
}
if (scheduleAsynchronously && asyncSchedulerThreads != null) { if (scheduleAsynchronously && asyncSchedulerThreads != null) {
for (Thread t : asyncSchedulerThreads) { for (Thread t : asyncSchedulerThreads) {
t.interrupt(); t.interrupt();
@ -3480,7 +3472,6 @@ public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) {
this.maxRunningEnforcer = enforcer; this.maxRunningEnforcer = enforcer;
} }
/** /**
* Returning true as capacity scheduler supports placement constraints. * Returning true as capacity scheduler supports placement constraints.
*/ */
@ -3489,11 +3480,6 @@ public boolean placementConstraintEnabled() {
return true; return true;
} }
@VisibleForTesting
public void setActivitiesManagerEnabled(boolean enabled) {
this.activitiesManagerEnabled = enabled;
}
@VisibleForTesting @VisibleForTesting
public void setQueueManager(CapacitySchedulerQueueManager qm) { public void setQueueManager(CapacitySchedulerQueueManager qm) {
this.queueManager = qm; this.queueManager = qm;

View File

@ -162,9 +162,13 @@ private void setUpWithNodeLabels() throws Exception {
private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels) private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
throws Exception { throws Exception {
CapacityScheduler spyCs = new CapacityScheduler(); CapacityScheduler spyCs = new CapacityScheduler();
spyCs.setActivitiesManagerEnabled(false);
queues = new CSQueueStore(); queues = new CSQueueStore();
cs = spy(spyCs); cs = spy(spyCs);
//All stub calls on the spy object of the 'cs' field should happen
//before cs.start() is invoked. See YARN-10672 for more details.
when(cs.getNumClusterNodes()).thenReturn(3);
rmContext = TestUtils.getMockRMContext(); rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext); spyRMContext = spy(rmContext);
@ -231,7 +235,6 @@ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
when(spyRMContext.getScheduler()).thenReturn(cs); when(spyRMContext.getScheduler()).thenReturn(cs);
when(spyRMContext.getYarnConfiguration()) when(spyRMContext.getYarnConfiguration())
.thenReturn(new YarnConfiguration()); .thenReturn(new YarnConfiguration());
when(cs.getNumClusterNodes()).thenReturn(3);
cs.start(); cs.start();
} }