YARN-10639. Queueinfo related capacity, should adjusted to weight mode. Contributed by Qi Zhu.

This commit is contained in:
Peter Bacsko 2021-03-05 13:18:06 +01:00
parent 6699198b54
commit e19c00925f
12 changed files with 87 additions and 18 deletions

View File

@ -59,7 +59,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled) {
boolean preemptionDisabled, float weight) {
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setCapacity(capacity);
@ -72,6 +72,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
queueInfo.setDefaultNodeLabelExpression(defaultNodeLabelExpression);
queueInfo.setQueueStatistics(queueStatistics);
queueInfo.setPreemptionDisabled(preemptionDisabled);
queueInfo.setWeight(weight);
return queueInfo;
}
@ -82,14 +83,14 @@ public static QueueInfo newInstance(String queueName, float capacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled,
boolean preemptionDisabled, float weight,
Map<String, QueueConfigurations> queueConfigurations) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled);
preemptionDisabled, weight);
queueInfo.setQueueConfigurations(queueConfigurations);
return queueInfo;
}
@ -101,7 +102,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled,
boolean preemptionDisabled, float weight,
Map<String, QueueConfigurations> queueConfigurations,
boolean intraQueuePreemptionDisabled) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
@ -109,7 +110,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled, queueConfigurations);
preemptionDisabled, weight, queueConfigurations);
queueInfo.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
return queueInfo;
}
@ -137,6 +138,18 @@ public static QueueInfo newInstance(String queueName, float capacity,
@Private
@Unstable
public abstract void setCapacity(float capacity);
/**
* Get the <em>configured weight</em> of the queue.
* @return <em>configured weight</em> of the queue
*/
@Public
@Stable
public abstract float getWeight();
@Private
@Unstable
public abstract void setWeight(float weight);
/**
* Get the <em>maximum capacity</em> of the queue.

View File

@ -609,6 +609,7 @@ message QueueInfoProto {
optional bool preemptionDisabled = 11;
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
optional bool intraQueuePreemptionDisabled = 13;
optional float weight = 14;
}
message QueueConfigurationsProto {

View File

@ -135,6 +135,8 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
writer.println(df.format(queueInfo.getCurrentCapacity() * 100) + "%");
writer.print("\tMaximum Capacity : ");
writer.println(df.format(queueInfo.getMaximumCapacity() * 100) + "%");
writer.print("\tWeight : ");
writer.println(df.format(queueInfo.getWeight()));
writer.print("\tDefault Node Label expression : ");
String nodeLabelExpression = queueInfo.getDefaultNodeLabelExpression();
nodeLabelExpression =

View File

@ -669,8 +669,9 @@ public List<NodeReport> createFakeNodeReports() {
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null, null, null, false,
null, false);
createFakeAppReports(), QueueState.RUNNING, null,
null, null, false, -1.0f,
null, false);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {

View File

@ -1719,7 +1719,8 @@ public void testGetQueueInfo() throws Exception {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null,
null, null, QueueState.RUNNING, nodeLabels,
"GPU", null, false, -1.0f, null,
false);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
@ -1733,6 +1734,7 @@ public void testGetQueueInfo() throws Exception {
pw.println("\tCapacity : " + "40.00%");
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tDefault Node Label expression : " + "GPU");
pw.println("\tAccessible Node Labels : " + "JDK_7,GPU");
pw.println("\tPreemption : " + "enabled");
@ -1887,7 +1889,8 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null, true, null, true);
null, null, QueueState.RUNNING, null, null, null,
true, -1.0f, null, true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1900,6 +1903,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
pw.println("\tCapacity : " + "40.00%");
pw.println("\tCurrent Capacity : " + "50.00%");
pw.println("\tMaximum Capacity : " + "80.00%");
pw.println("\tWeight : " + "-1.00");
pw.println("\tDefault Node Label expression : "
+ NodeLabel.DEFAULT_NODE_LABEL_PARTITION);
pw.println("\tAccessible Node Labels : ");

View File

@ -124,6 +124,18 @@ public void setCapacity(float capacity) {
builder.setCapacity(capacity);
}
@Override
public float getWeight() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasWeight()) ? p.getWeight() : -1;
}
@Override
public void setWeight(float weight) {
maybeInitBuilder();
builder.setWeight(weight);
}
@Override
public void setChildQueues(List<QueueInfo> childQueues) {
if (childQueues == null) {

View File

@ -434,7 +434,7 @@ public static void setup() throws Exception {
// it is recursive(has sub queues)
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
"x && y", null, false, null, false));
"x && y", null, false, -1.0f, null, false));
generateByNewInstance(QueueStatistics.class);
generateByNewInstance(QueueUserACLInfo.class);
generateByNewInstance(YarnClusterMetrics.class);

View File

@ -727,6 +727,7 @@ protected QueueInfo getQueueInfo() {
queueInfo.setIntraQueuePreemptionDisabled(
getIntraQueuePreemptionDisabled());
queueInfo.setQueueConfigurations(getQueueConfigurations());
queueInfo.setWeight(queueCapacities.getWeight());
return queueInfo;
}

View File

@ -67,7 +67,7 @@ public void setUp() throws IOException {
private void mockQueue(String queueName, MutableConfScheduler scheduler)
throws IOException {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
null, null, null, null, false, null, false);
null, null, null, null, false, -1.0f, null, false);
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo);
Queue queue = mock(Queue.class);

View File

@ -217,7 +217,7 @@ private Queue createQueue(String name, Queue parent) {
private Queue createQueue(String name, Queue parent, float capacity) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);

View File

@ -257,7 +257,7 @@ public static Configuration getCSConfWithLabelsParentUsePctChildUseWeight(
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightOnly() throws Exception {
internalTestContainerAlloationWithNodeLabel(
internalTestContainerAllocationWithNodeLabel(
getCSConfWithQueueLabelsWeightOnly(conf));
}
@ -270,7 +270,7 @@ public void testContainerAllocateWithComplexLabelsWeightOnly() throws Exception
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed1() throws Exception {
internalTestContainerAlloationWithNodeLabel(
internalTestContainerAllocationWithNodeLabel(
getCSConfWithLabelsParentUseWeightChildUsePct(conf));
}
@ -283,7 +283,7 @@ public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed1() throw
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed2() throws Exception {
internalTestContainerAlloationWithNodeLabel(
internalTestContainerAllocationWithNodeLabel(
getCSConfWithLabelsParentUsePctChildUseWeight(conf));
}
@ -338,8 +338,43 @@ public void testGetCapacityOrWeightStringParentPctLeafWeights()
}
}
private void internalTestContainerAlloationWithNodeLabel(Configuration csConf)
throws Exception {
@Test
public void testQueueInfoWeight() throws Exception {
MockRM rm = new MockRM(conf);
rm.init(conf);
rm.start();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
conf);
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b", "default"});
csConf.setNonLabeledQueueWeight("root.a", 1);
csConf.setNonLabeledQueueWeight("root.b", 2);
csConf.setNonLabeledQueueWeight("root.default", 3);
// Check queue info capacity
CapacityScheduler cs =
(CapacityScheduler)rm.getRMContext().getScheduler();
cs.reinitialize(csConf, rm.getRMContext());
LeafQueue a = (LeafQueue)
cs.getQueue("root.a");
Assert.assertNotNull(a);
Assert.assertEquals(a.getQueueCapacities().getWeight(),
a.getQueueInfo(false,
false).getWeight(), 1e-6);
LeafQueue b = (LeafQueue)
cs.getQueue("root.b");
Assert.assertNotNull(b);
Assert.assertEquals(b.getQueueCapacities().getWeight(),
b.getQueueInfo(false,
false).getWeight(), 1e-6);
rm.close();
}
private void internalTestContainerAllocationWithNodeLabel(
Configuration csConf) throws Exception {
/*
* Queue structure:
* root (*)

View File

@ -4905,7 +4905,7 @@ private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
float absCap, Resource res) {
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
null, QueueState.RUNNING, null, "", null, false, null, false);
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);