YARN-4557. Fix improper Queues sorting in PartitionedQueueComparator when accessible-node-labels=*. (Naganarasimha G R via wangda)
This commit is contained in:
parent
1708a4cd23
commit
5ff5f67332
@ -1289,6 +1289,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-4565. Fix a bug that leads to AM resource limit not hornored when
|
||||
sizeBasedWeight enabled for FairOrderingPolicy. (wtan via jianhe)
|
||||
|
||||
YARN-4557. Fix improper Queues sorting in PartitionedQueueComparator
|
||||
when accessible-node-labels=*. (Naganarasimha G R via wangda)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -57,7 +57,7 @@
|
||||
public class AppSchedulingInfo {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
|
||||
private static final Comparator COMPARATOR =
|
||||
private static final Comparator<Priority> COMPARATOR =
|
||||
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator();
|
||||
private static final int EPOCH_BIT_SHIFT = 40;
|
||||
|
||||
|
@ -20,6 +20,8 @@
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
||||
public class PartitionedQueueComparator implements Comparator<CSQueue> {
|
||||
private String partitionToLookAt = null;
|
||||
|
||||
@ -35,9 +37,11 @@ public int compare(CSQueue q1, CSQueue q2) {
|
||||
* the other not, accessible queue goes first.
|
||||
*/
|
||||
boolean q1Accessible =
|
||||
q1.getAccessibleNodeLabels().contains(partitionToLookAt);
|
||||
q1.getAccessibleNodeLabels().contains(partitionToLookAt)
|
||||
|| q1.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
|
||||
boolean q2Accessible =
|
||||
q2.getAccessibleNodeLabels().contains(partitionToLookAt);
|
||||
q2.getAccessibleNodeLabels().contains(partitionToLookAt)
|
||||
|| q2.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
|
||||
if (q1Accessible && !q2Accessible) {
|
||||
return -1;
|
||||
} else if (!q1Accessible && q2Accessible) {
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -502,7 +503,6 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
};
|
||||
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||
rm1.start();
|
||||
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // no label
|
||||
MockNM nm2 = rm1.registerNode("h2:1234", 40 * GB); // label = y
|
||||
// launch an app to queue b1 (label = y), AM container should be launched in
|
||||
// nm2
|
||||
@ -1470,9 +1470,11 @@ public void testOrderOfAllocationOnPartitions()
|
||||
csConf.setCapacityByLabel(B, "x", 70);
|
||||
|
||||
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
csConf.setAccessibleNodeLabels(C, Collections.<String> emptySet());
|
||||
csConf.setCapacity(C, 25);
|
||||
|
||||
final String D = CapacitySchedulerConfiguration.ROOT + ".d";
|
||||
csConf.setAccessibleNodeLabels(D, Collections.<String> emptySet());
|
||||
csConf.setCapacity(D, 25);
|
||||
|
||||
// set node -> label
|
||||
@ -1601,4 +1603,77 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOrderOfAllocationOnPartitionsWhenAccessibilityIsAll()
|
||||
throws Exception {
|
||||
/**
|
||||
* Test case: have a following queue structure:
|
||||
*
|
||||
* <pre>
|
||||
* root
|
||||
* __________
|
||||
* / \
|
||||
* a (*) b (x)
|
||||
* </pre>
|
||||
*
|
||||
* Both queues a/b can access x, we need to verify whether * accessibility
|
||||
* is considered in ordering of queues
|
||||
*/
|
||||
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration(this.conf);
|
||||
|
||||
// Define top-level queues
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] { "a", "b" });
|
||||
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||
|
||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
csConf.setCapacity(A, 25);
|
||||
csConf.setAccessibleNodeLabels(A, toSet("*"));
|
||||
csConf.setCapacityByLabel(A, "x", 60);
|
||||
|
||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
csConf.setCapacity(B, 75);
|
||||
csConf.setAccessibleNodeLabels(B, toSet("x"));
|
||||
csConf.setCapacityByLabel(B, "x", 40);
|
||||
|
||||
// set node -> label
|
||||
mgr.addToCluserNodeLabels(
|
||||
ImmutableSet.of(NodeLabel.newInstance("x", false)));
|
||||
mgr.addLabelsToNode(
|
||||
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
|
||||
|
||||
// inject node label manager
|
||||
MockRM rm = new MockRM(csConf) {
|
||||
@Override
|
||||
public RMNodeLabelsManager createNodeLabelManager() {
|
||||
return mgr;
|
||||
}
|
||||
};
|
||||
|
||||
rm.getRMContext().setNodeLabelManager(mgr);
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
|
||||
|
||||
// app1 -> a
|
||||
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a", "x");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
// app2 -> b
|
||||
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b", "x");
|
||||
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
|
||||
|
||||
// Both a/b has used_capacity(x) = 0, when doing exclusive allocation, a
|
||||
// will go first since a has more capacity(x)
|
||||
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
|
||||
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
|
||||
doNMHeartbeat(rm, nm1.getNodeId(), 1);
|
||||
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
|
||||
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user