YARN-10995. Move PendingApplicationComparator from GuaranteedOrZeroCapacityOverTimePolicy. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2022-02-17 19:43:37 +01:00
parent 51001fc7bf
commit 4c05d257ba
4 changed files with 35 additions and 38 deletions

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -201,6 +202,9 @@ public class CapacityScheduler extends
private int threadNum = 0;
private final PendingApplicationComparator applicationComparator =
new PendingApplicationComparator();
@Override
public void setConf(Configuration conf) {
yarnConf = conf;
@ -3555,4 +3559,31 @@ public List<AsyncScheduleThread> getAsyncSchedulerThreads() {
return asyncSchedulerThreads;
}
}
@Override
public PendingApplicationComparator getPendingApplicationComparator(){
return applicationComparator;
}
/**
* Comparator that orders applications by their submit time.
*/
class PendingApplicationComparator
implements Comparator<FiCaSchedulerApp> {
@Override
public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
RMApp rmApp1 = rmContext.getRMApps().get(app1.getApplicationId());
RMApp rmApp2 = rmContext.getRMApps().get(app2.getApplicationId());
if (rmApp1 != null && rmApp2 != null) {
return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime());
} else if (rmApp1 != null) {
return -1;
} else if (rmApp2 != null) {
return 1;
} else{
return 0;
}
}
}
}

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -104,4 +102,5 @@ public interface CapacitySchedulerContext {
*/
Clock getClock();
CapacityScheduler.PendingApplicationComparator getPendingApplicationComparator();
}

View File

@ -19,11 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@ -129,8 +127,7 @@ public FiCaSchedulerApp getApplicationAttempt(
return csContext.getApplicationAttempt(applicationAttemptId);
}
// TODO this is used in GuaranteedOrZeroCapacityOverTimePolicy, refactor the comparator there
public RMApp getRMApp(ApplicationId applicationId) {
return csContext.getRMContext().getRMApps().get(applicationId);
public CapacityScheduler.PendingApplicationComparator getApplicationComparator() {
return csContext.getPendingApplicationComparator();
}
}

View File

@ -22,7 +22,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
@ -42,8 +41,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -253,33 +250,6 @@ void clear() {
}
}
/**
* Comparator that orders applications by their submit time
*/
private class PendingApplicationComparator
implements Comparator<FiCaSchedulerApp> {
@Override
public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
RMApp rmApp1 = managedParentQueue.getQueueContext().getRMApp(
app1.getApplicationId());
RMApp rmApp2 = managedParentQueue.getQueueContext().getRMApp(
app2.getApplicationId());
if (rmApp1 != null && rmApp2 != null) {
return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime());
} else if (rmApp1 != null) {
return -1;
} else if (rmApp2 != null) {
return 1;
} else{
return 0;
}
}
}
private PendingApplicationComparator applicationComparator =
new PendingApplicationComparator();
@Override
public void init(final ParentQueue parentQueue) throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@ -809,7 +779,7 @@ public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
private List<FiCaSchedulerApp> getSortedPendingApplications() {
List<FiCaSchedulerApp> apps = new ArrayList<>(
managedParentQueue.getAllApplications());
Collections.sort(apps, applicationComparator);
apps.sort(managedParentQueue.getQueueContext().getApplicationComparator());
return apps;
}