diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java index c39d57d3b3..92b04d7779 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java @@ -496,4 +496,15 @@ public List getNodesPerPartition(String partition) { } return nodesPerPartition; } + + public List getPartitions() { + List partitions = null; + readLock.lock(); + try { + partitions = new ArrayList(nodesPerLabel.keySet()); + } finally { + readLock.unlock(); + } + return partitions; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e25301bb17..51df2242be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -531,6 +531,8 @@ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, /** * Schedule on all nodes by starting at a random point. + * Schedule on all partitions by starting at a random partition + * when multiNodePlacementEnabled is true. * @param cs */ static void schedule(CapacityScheduler cs) throws InterruptedException{ @@ -544,44 +546,79 @@ static void schedule(CapacityScheduler cs) throws InterruptedException{ if(nodeSize == 0) { return; } - int start = random.nextInt(nodeSize); - // To avoid too verbose DEBUG logging, only print debug log once for - // every 10 secs. - boolean printSkipedNodeLogging = false; - if (Time.monotonicNow() / 1000 % 10 == 0) { - printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); - } else { - printedVerboseLoggingForAsyncScheduling = false; - } + if (!cs.multiNodePlacementEnabled) { + int start = random.nextInt(nodeSize); - // Allocate containers of node [start, end) - for (FiCaSchedulerNode node : nodes) { - if (current++ >= start) { + // To avoid too verbose DEBUG logging, only print debug log once for + // every 10 secs. + boolean printSkipedNodeLogging = false; + if (Time.monotonicNow() / 1000 % 10 == 0) { + printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling); + } else { + printedVerboseLoggingForAsyncScheduling = false; + } + + // Allocate containers of node [start, end) + for (FiCaSchedulerNode node : nodes) { + if (current++ >= start) { + if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { + continue; + } + cs.allocateContainersToNode(node.getNodeID(), false); + } + } + + current = 0; + + // Allocate containers of node [0, start) + for (FiCaSchedulerNode node : nodes) { + if (current++ > start) { + break; + } if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { continue; } cs.allocateContainersToNode(node.getNodeID(), false); } - } - current = 0; - - // Allocate containers of node [0, start) - for (FiCaSchedulerNode node : nodes) { - if (current++ > start) { - break; + if (printSkipedNodeLogging) { + printedVerboseLoggingForAsyncScheduling = true; } - if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) { - continue; + } else { + // Get all partitions + List partitions = cs.nodeTracker.getPartitions(); + int partitionSize = partitions.size(); + // First randomize the start point + int start = random.nextInt(partitionSize); + // Allocate containers of partition [start, end) + for (String partititon : partitions) { + if (current++ >= start) { + CandidateNodeSet candidates = + cs.getCandidateNodeSet(partititon); + if (candidates == null) { + continue; + } + cs.allocateContainersToNode(candidates, false); + } } - cs.allocateContainersToNode(node.getNodeID(), false); - } - if (printSkipedNodeLogging) { - printedVerboseLoggingForAsyncScheduling = true; - } + current = 0; + // Allocate containers of partition [0, start) + for (String partititon : partitions) { + if (current++ > start) { + break; + } + CandidateNodeSet candidates = + cs.getCandidateNodeSet(partititon); + if (candidates == null) { + continue; + } + cs.allocateContainersToNode(candidates, false); + } + + } Thread.sleep(cs.getAsyncScheduleInterval()); } @@ -1486,17 +1523,34 @@ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount, } private CandidateNodeSet getCandidateNodeSet( - FiCaSchedulerNode node) { + String partition) { + CandidateNodeSet candidates = null; + Map nodesByPartition = new HashMap<>(); + List nodes = nodeTracker + .getNodesPerPartition(partition); + if (nodes != null && !nodes.isEmpty()) { + //Filter for node heartbeat too long + nodes.stream() + .filter(node -> !shouldSkipNodeSchedule(node, this, true)) + .forEach(n -> nodesByPartition.put(n.getNodeID(), n)); + candidates = new SimpleCandidateNodeSet( + nodesByPartition, partition); + } + return candidates; + } + + private CandidateNodeSet getCandidateNodeSet( + FiCaSchedulerNode node) { CandidateNodeSet candidates = null; candidates = new SimpleCandidateNodeSet<>(node); if (multiNodePlacementEnabled) { Map nodesByPartition = new HashMap<>(); List nodes = nodeTracker - .getNodesPerPartition(node.getPartition()); + .getNodesPerPartition(node.getPartition()); if (nodes != null && !nodes.isEmpty()) { nodes.forEach(n -> nodesByPartition.put(n.getNodeID(), n)); candidates = new SimpleCandidateNodeSet( - nodesByPartition, node.getPartition()); + nodesByPartition, node.getPartition()); } } return candidates; @@ -1513,8 +1567,8 @@ private void allocateContainersToNode(NodeId nodeId, int offswitchCount = 0; int assignedContainers = 0; - CandidateNodeSet candidates = getCandidateNodeSet( - node); + CandidateNodeSet candidates = + getCandidateNodeSet(node); CSAssignment assignment = allocateContainersToNode(candidates, withNodeHeartbeat); // Only check if we can allocate more container on the same node when diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 59ab077ba4..5f2bbf0190 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -85,6 +85,10 @@ public class TestCapacitySchedulerAsyncScheduling { private NMHeartbeatThread nmHeartbeatThread = null; + private static final String POLICY_CLASS_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.scheduler" + + ".placement.ResourceUsageMultiNodeLookupPolicy"; + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -111,6 +115,21 @@ public void testThreeThreadsAsyncContainerAllocation() throws Exception { testAsyncContainerAllocation(3); } + @Test(timeout = 300000) + public void testAsyncContainerAllocationWithMultiNode() throws Exception { + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES, + "resource-based"); + conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME, + "resource-based"); + String policyName = + CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + + ".resource-based" + ".class"; + conf.set(policyName, POLICY_CLASS_NAME); + conf.setBoolean(CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, + true); + testAsyncContainerAllocation(2); + } + public void testAsyncContainerAllocation(int numThreads) throws Exception { conf.setInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,