From b0e5465244f2f9e963fa86b67b26fb3fe5d7f426 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 17 Mar 2024 22:12:04 +0800 Subject: [PATCH] =?UTF-8?q?=20=E5=A2=9E=E5=8A=A0=E5=AE=B9=E9=87=8F?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=99=A8=E7=9A=84=E4=BB=A3=E7=A0=81=E6=B3=A8?= =?UTF-8?q?=E9=87=8A=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resourcemanager/scheduler/capacity/CSQueue.java | 2 ++ .../scheduler/capacity/CapacityScheduler.java | 13 +++++++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 77d8fa9c7c..85988eae0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -213,6 +213,8 @@ public void finishApplicationAttempt(FiCaSchedulerApp application, /** * Assign containers to applications in the queue or it's children (if any). + * 分配资源,包括父队列以及子队列 + * * @param clusterResource the resource of the cluster. * @param candidates {@link CandidateNodeSet} the nodes that are considered * for the current placement. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e378c273f8..3e068888a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -515,6 +515,13 @@ long getAsyncScheduleInterval() { private final static Random random = new Random(System.currentTimeMillis()); + /** + * 跳过已经由2次心跳周期没有上报的节点。 + * @param node + * @param cs + * @param printVerboseLog + * @return + */ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, CapacityScheduler cs, boolean printVerboseLog) { // Skip node which missed 2 heartbeats since the node might be dead and @@ -557,7 +564,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ printedVerboseLoggingForAsyncScheduling = false; } - // Allocate containers of node [start, end) + // Allocate containers of node [start, end) 从随机的节点开始分配,可以保证分配均衡。 for (FiCaSchedulerNode node : nodes) { if (current++ >= start) { if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { @@ -569,7 +576,7 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ current = 0; - // Allocate containers of node [0, start) + // Allocate containers of node [0, start) 从0到随机的节点 for (FiCaSchedulerNode node : nodes) { if (current++ > start) { break; @@ -1796,6 +1803,7 @@ CSAssignment allocateContainersToNode( if (!multiNodePlacementEnabled) { ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, node.getNodeID()); + // 申请资源到单个节点,老版本逻辑 assignment = allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat); ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, @@ -1803,6 +1811,7 @@ CSAssignment allocateContainersToNode( } else{ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, ActivitiesManager.EMPTY_NODE_ID); + // 申请资源到多个节点,新逻辑 assignment = allocateContainersOnMultiNodes(candidates); ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, ActivitiesManager.EMPTY_NODE_ID, candidates.getPartition());