From 4c05d257ba3f3311b5bbc993f6e5e35637487d88 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 17 Feb 2022 19:43:37 +0100 Subject: [PATCH] YARN-10995. Move PendingApplicationComparator from GuaranteedOrZeroCapacityOverTimePolicy. Contributed by Benjamin Teke --- .../scheduler/capacity/CapacityScheduler.java | 31 ++++++++++++++++++ .../capacity/CapacitySchedulerContext.java | 3 +- .../CapacitySchedulerQueueContext.java | 7 ++-- ...uaranteedOrZeroCapacityOverTimePolicy.java | 32 +------------------ 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 16c18bcd5c..44e80a6c23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -201,6 +202,9 @@ public class CapacityScheduler extends private int threadNum = 0; + private final PendingApplicationComparator applicationComparator = + new PendingApplicationComparator(); + @Override public void setConf(Configuration conf) { yarnConf = conf; @@ -3555,4 +3559,31 @@ public List getAsyncSchedulerThreads() { return asyncSchedulerThreads; } } + + @Override + public PendingApplicationComparator getPendingApplicationComparator(){ + return applicationComparator; + } + + /** + * Comparator that orders applications by their submit time. + */ + class PendingApplicationComparator + implements Comparator { + + @Override + public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { + RMApp rmApp1 = rmContext.getRMApps().get(app1.getApplicationId()); + RMApp rmApp2 = rmContext.getRMApps().get(app2.getApplicationId()); + if (rmApp1 != null && rmApp2 != null) { + return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); + } else if (rmApp1 != null) { + return -1; + } else if (rmApp2 != null) { + return 1; + } else{ + return 0; + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 1d0600f668..5795a7e30f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.Comparator; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -104,4 +102,5 @@ public interface CapacitySchedulerContext { */ Clock getClock(); + CapacityScheduler.PendingApplicationComparator getPendingApplicationComparator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java index df7a627456..4d11137655 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueContext.java @@ -19,11 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -129,8 +127,7 @@ public FiCaSchedulerApp getApplicationAttempt( return csContext.getApplicationAttempt(applicationAttemptId); } - // TODO this is used in GuaranteedOrZeroCapacityOverTimePolicy, refactor the comparator there - public RMApp getRMApp(ApplicationId applicationId) { - return csContext.getRMContext().getRMApps().get(applicationId); + public CapacityScheduler.PendingApplicationComparator getApplicationComparator() { + return csContext.getPendingApplicationComparator(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java index 35275574b7..0d51983bde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java @@ -22,7 +22,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils; @@ -42,8 +41,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -253,33 +250,6 @@ void clear() { } } - /** - * Comparator that orders applications by their submit time - */ - private class PendingApplicationComparator - implements Comparator { - - @Override - public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) { - RMApp rmApp1 = managedParentQueue.getQueueContext().getRMApp( - app1.getApplicationId()); - RMApp rmApp2 = managedParentQueue.getQueueContext().getRMApp( - app2.getApplicationId()); - if (rmApp1 != null && rmApp2 != null) { - return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime()); - } else if (rmApp1 != null) { - return -1; - } else if (rmApp2 != null) { - return 1; - } else{ - return 0; - } - } - } - - private PendingApplicationComparator applicationComparator = - new PendingApplicationComparator(); - @Override public void init(final ParentQueue parentQueue) throws IOException { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -809,7 +779,7 @@ public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) { private List getSortedPendingApplications() { List apps = new ArrayList<>( managedParentQueue.getAllApplications()); - Collections.sort(apps, applicationComparator); + apps.sort(managedParentQueue.getQueueContext().getApplicationComparator()); return apps; }