YARN-10658. CapacityScheduler QueueInfo add queue path field to avoid ambiguous QueueName. Contributed by Qi Zhu.
This commit is contained in:
parent
4a0b7f7ebe
commit
04cd3115ba
@ -54,7 +54,8 @@ public abstract class QueueInfo {
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static QueueInfo newInstance(String queueName, float capacity,
|
||||
public static QueueInfo newInstance(String queueName,
|
||||
String queuePath, float capacity,
|
||||
float maximumCapacity, float currentCapacity,
|
||||
List<QueueInfo> childQueues, List<ApplicationReport> applications,
|
||||
QueueState queueState, Set<String> accessibleNodeLabels,
|
||||
@ -62,6 +63,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
||||
boolean preemptionDisabled, float weight) {
|
||||
QueueInfo queueInfo = Records.newRecord(QueueInfo.class);
|
||||
queueInfo.setQueueName(queueName);
|
||||
queueInfo.setQueuePath(queuePath);
|
||||
queueInfo.setCapacity(capacity);
|
||||
queueInfo.setMaximumCapacity(maximumCapacity);
|
||||
queueInfo.setCurrentCapacity(currentCapacity);
|
||||
@ -78,14 +80,15 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static QueueInfo newInstance(String queueName, float capacity,
|
||||
public static QueueInfo newInstance(String queueName,
|
||||
String queuePath, float capacity,
|
||||
float maximumCapacity, float currentCapacity,
|
||||
List<QueueInfo> childQueues, List<ApplicationReport> applications,
|
||||
QueueState queueState, Set<String> accessibleNodeLabels,
|
||||
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
|
||||
boolean preemptionDisabled, float weight,
|
||||
Map<String, QueueConfigurations> queueConfigurations) {
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
|
||||
maximumCapacity, currentCapacity,
|
||||
childQueues, applications,
|
||||
queueState, accessibleNodeLabels,
|
||||
@ -97,7 +100,8 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public static QueueInfo newInstance(String queueName, float capacity,
|
||||
public static QueueInfo newInstance(String queueName,
|
||||
String queuePath, float capacity,
|
||||
float maximumCapacity, float currentCapacity,
|
||||
List<QueueInfo> childQueues, List<ApplicationReport> applications,
|
||||
QueueState queueState, Set<String> accessibleNodeLabels,
|
||||
@ -105,7 +109,7 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
||||
boolean preemptionDisabled, float weight,
|
||||
Map<String, QueueConfigurations> queueConfigurations,
|
||||
boolean intraQueuePreemptionDisabled) {
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, queuePath, capacity,
|
||||
maximumCapacity, currentCapacity,
|
||||
childQueues, applications,
|
||||
queueState, accessibleNodeLabels,
|
||||
@ -126,6 +130,18 @@ public static QueueInfo newInstance(String queueName, float capacity,
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueueName(String queueName);
|
||||
|
||||
/**
|
||||
* Get the <em>path</em> of the queue.
|
||||
* @return <em>path</em> of the queue
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
public abstract String getQueuePath();
|
||||
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueuePath(String queuePath);
|
||||
|
||||
/**
|
||||
* Get the <em>configured capacity</em> of the queue.
|
||||
|
@ -610,6 +610,7 @@ message QueueInfoProto {
|
||||
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
|
||||
optional bool intraQueuePreemptionDisabled = 13;
|
||||
optional float weight = 14;
|
||||
optional string queuePath = 15;
|
||||
}
|
||||
|
||||
message QueueConfigurationsProto {
|
||||
|
@ -125,6 +125,8 @@ private int listQueue(String queueName) throws YarnException, IOException {
|
||||
private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
|
||||
writer.print("Queue Name : ");
|
||||
writer.println(queueInfo.getQueueName());
|
||||
writer.print("Queue Path : ");
|
||||
writer.println(queueInfo.getQueuePath());
|
||||
|
||||
writer.print("\tState : ");
|
||||
writer.println(queueInfo.getQueueState());
|
||||
|
@ -668,7 +668,7 @@ public List<NodeReport> createFakeNodeReports() {
|
||||
}
|
||||
|
||||
public QueueInfo createFakeQueueInfo() {
|
||||
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
|
||||
return QueueInfo.newInstance("root", "root", 100f, 100f, 50f, null,
|
||||
createFakeAppReports(), QueueState.RUNNING, null,
|
||||
null, null, false, -1.0f,
|
||||
null, false);
|
||||
|
@ -1718,7 +1718,9 @@ public void testGetQueueInfo() throws Exception {
|
||||
Set<String> nodeLabels = new HashSet<String>();
|
||||
nodeLabels.add("GPU");
|
||||
nodeLabels.add("JDK_7");
|
||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||
QueueInfo queueInfo = QueueInfo.
|
||||
newInstance("queueA", "root.queueA",
|
||||
0.4f, 0.8f, 0.5f,
|
||||
null, null, QueueState.RUNNING, nodeLabels,
|
||||
"GPU", null, false, -1.0f, null,
|
||||
false);
|
||||
@ -1730,6 +1732,7 @@ public void testGetQueueInfo() throws Exception {
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("Queue Information : ");
|
||||
pw.println("Queue Name : " + "queueA");
|
||||
pw.println("Queue Path : " + "root.queueA");
|
||||
pw.println("\tState : " + "RUNNING");
|
||||
pw.println("\tCapacity : " + "40.00%");
|
||||
pw.println("\tCurrent Capacity : " + "50.00%");
|
||||
@ -1888,7 +1891,9 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
|
||||
@Test
|
||||
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
||||
QueueCLI cli = createAndGetQueueCLI();
|
||||
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
|
||||
QueueInfo queueInfo = QueueInfo.
|
||||
newInstance("queueA", "root.queueA",
|
||||
0.4f, 0.8f, 0.5f,
|
||||
null, null, QueueState.RUNNING, null, null, null,
|
||||
true, -1.0f, null, true);
|
||||
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
|
||||
@ -1899,6 +1904,7 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("Queue Information : ");
|
||||
pw.println("Queue Name : " + "queueA");
|
||||
pw.println("Queue Path : " + "root.queueA");
|
||||
pw.println("\tState : " + "RUNNING");
|
||||
pw.println("\tCapacity : " + "40.00%");
|
||||
pw.println("\tCurrent Capacity : " + "50.00%");
|
||||
|
@ -101,6 +101,12 @@ public String getQueueName() {
|
||||
return (p.hasQueueName()) ? p.getQueueName() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueuePath() {
|
||||
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.hasQueuePath()) ? p.getQueuePath() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueState getQueueState() {
|
||||
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
|
||||
@ -166,6 +172,16 @@ public void setQueueName(String queueName) {
|
||||
builder.setQueueName(queueName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueuePath(String queuePath) {
|
||||
maybeInitBuilder();
|
||||
if (queuePath == null) {
|
||||
builder.clearQueuePath();
|
||||
return;
|
||||
}
|
||||
builder.setQueuePath(queuePath);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueueState(QueueState queueState) {
|
||||
maybeInitBuilder();
|
||||
|
@ -432,7 +432,8 @@ public static void setup() throws Exception {
|
||||
generateByNewInstance(ContainerUpdateResponse.class);
|
||||
// genByNewInstance does not apply to QueueInfo, cause
|
||||
// it is recursive(has sub queues)
|
||||
typeValueCache.put(QueueInfo.class, QueueInfo.newInstance("root", 1.0f,
|
||||
typeValueCache.put(QueueInfo.class, QueueInfo.
|
||||
newInstance("root", "root", 1.0f,
|
||||
1.0f, 0.1f, null, null, QueueState.RUNNING, ImmutableSet.of("x", "y"),
|
||||
"x && y", null, false, -1.0f, null, false));
|
||||
generateByNewInstance(QueueStatistics.class);
|
||||
|
@ -716,6 +716,7 @@ protected QueueInfo getQueueInfo() {
|
||||
// TODO, improve this
|
||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queueInfo.setQueueName(queueName);
|
||||
queueInfo.setQueuePath(queuePath);
|
||||
queueInfo.setAccessibleNodeLabels(accessibleLabels);
|
||||
queueInfo.setCapacity(queueCapacities.getCapacity());
|
||||
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
|
||||
|
@ -59,23 +59,26 @@ public void setUp() throws IOException {
|
||||
rmContext = mock(RMContext.class);
|
||||
scheduler = mock(MutableConfScheduler.class);
|
||||
when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
mockQueue("a", scheduler);
|
||||
mockQueue("b", scheduler);
|
||||
mockQueue("b1", scheduler);
|
||||
mockQueue("a", "root.a", scheduler);
|
||||
mockQueue("b", "root.b", scheduler);
|
||||
mockQueue("b1", "root.b1", scheduler);
|
||||
}
|
||||
|
||||
private void mockQueue(String queueName, MutableConfScheduler scheduler)
|
||||
private void mockQueue(String queueName,
|
||||
String queuePath, MutableConfScheduler confScheduler)
|
||||
throws IOException {
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null,
|
||||
QueueInfo queueInfo = QueueInfo.
|
||||
newInstance(queueName, queuePath, 0, 0,
|
||||
0, null, null,
|
||||
null, null, null, null, false, -1.0f, null, false);
|
||||
when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
|
||||
when(confScheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean()))
|
||||
.thenReturn(queueInfo);
|
||||
Queue queue = mock(Queue.class);
|
||||
when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(GOOD_USER)))
|
||||
.thenReturn(true);
|
||||
when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(BAD_USER)))
|
||||
.thenReturn(false);
|
||||
when(scheduler.getQueue(eq(queueName))).thenReturn(queue);
|
||||
when(confScheduler.getQueue(eq(queueName))).thenReturn(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -216,7 +216,8 @@ private Queue createQueue(String name, Queue parent) {
|
||||
|
||||
private Queue createQueue(String name, Queue parent, float capacity) {
|
||||
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(name,
|
||||
"root." + name, capacity, 1.0f, 0, null,
|
||||
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
|
||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||
Queue queue = mock(Queue.class);
|
||||
|
@ -1146,6 +1146,8 @@ public void testCapacitySchedulerInfo() throws Exception {
|
||||
QueueInfo queueInfo = resourceManager.getResourceScheduler().getQueueInfo("a", true, true);
|
||||
Assert.assertEquals("Queue Name should be a", "a",
|
||||
queueInfo.getQueueName());
|
||||
Assert.assertEquals("Queue Path should be root.a", "root.a",
|
||||
queueInfo.getQueuePath());
|
||||
Assert.assertEquals("Child Queues size should be 2", 2,
|
||||
queueInfo.getChildQueues().size());
|
||||
|
||||
@ -4362,12 +4364,16 @@ public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
|
||||
QueueInfo queueInfoA = cs.getQueueInfo("a", true, false);
|
||||
Assert.assertEquals("Queue Name should be a", "a",
|
||||
queueInfoA.getQueueName());
|
||||
Assert.assertEquals("Queue Path should be root.a", "root.a",
|
||||
queueInfoA.getQueuePath());
|
||||
Assert.assertEquals("Default Node Label Expression should be x", "x",
|
||||
queueInfoA.getDefaultNodeLabelExpression());
|
||||
|
||||
QueueInfo queueInfoB = cs.getQueueInfo("b", true, false);
|
||||
Assert.assertEquals("Queue Name should be b", "b",
|
||||
queueInfoB.getQueueName());
|
||||
Assert.assertEquals("Queue Path should be root.b", "root.b",
|
||||
queueInfoB.getQueuePath());
|
||||
Assert.assertEquals("Default Node Label Expression should be y", "y",
|
||||
queueInfoB.getDefaultNodeLabelExpression());
|
||||
}
|
||||
|
@ -876,6 +876,41 @@ public void testAutoCreateQueueAfterRemoval() throws Exception {
|
||||
b.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueInfoIfAmbiguousQueueNames() throws Exception {
|
||||
startScheduler();
|
||||
|
||||
AbstractCSQueue b = (AbstractCSQueue) cs.
|
||||
getQueue("root.b");
|
||||
Assert.assertFalse(b.isDynamicQueue());
|
||||
Assert.assertEquals("root.b",
|
||||
b.getQueueInfo().getQueuePath());
|
||||
|
||||
createQueue("root.a.b.b");
|
||||
|
||||
AbstractCSQueue bAutoParent = (AbstractCSQueue) cs.
|
||||
getQueue("root.a.b");
|
||||
Assert.assertTrue(bAutoParent.isDynamicQueue());
|
||||
Assert.assertTrue(bAutoParent.hasChildQueues());
|
||||
Assert.assertEquals("root.a.b",
|
||||
bAutoParent.getQueueInfo().getQueuePath());
|
||||
|
||||
AbstractCSQueue bAutoLeafQueue =
|
||||
(AbstractCSQueue) cs.getQueue("root.a.b.b");
|
||||
Assert.assertTrue(bAutoLeafQueue.isDynamicQueue());
|
||||
Assert.assertFalse(bAutoLeafQueue.hasChildQueues());
|
||||
Assert.assertEquals("root.a.b.b",
|
||||
bAutoLeafQueue.getQueueInfo().getQueuePath());
|
||||
|
||||
// Make sure all queue name are ambiguous
|
||||
Assert.assertEquals("b",
|
||||
b.getQueueInfo().getQueueName());
|
||||
Assert.assertEquals("b",
|
||||
bAutoParent.getQueueInfo().getQueueName());
|
||||
Assert.assertEquals("b",
|
||||
bAutoLeafQueue.getQueueInfo().getQueueName());
|
||||
}
|
||||
|
||||
protected LeafQueue createQueue(String queuePath) throws YarnException {
|
||||
return autoQueueHandler.autoCreateQueue(
|
||||
CSQueueUtils.extractQueuePath(queuePath));
|
||||
|
@ -4786,7 +4786,7 @@ public void testApplicationQueuePercent()
|
||||
|
||||
// Queue "test" consumes 100% of the cluster, so its capacity and absolute
|
||||
// capacity are both 1.0f.
|
||||
Queue queue = createQueue("test", null, 1.0f, 1.0f, res);
|
||||
Queue queue = createQueue("test", "root.test", null, 1.0f, 1.0f, res);
|
||||
final String user = "user1";
|
||||
FiCaSchedulerApp app =
|
||||
new FiCaSchedulerApp(appAttId, user, queue,
|
||||
@ -4803,7 +4803,7 @@ public void testApplicationQueuePercent()
|
||||
|
||||
// Queue "test2" is a child of root and its capacity is 50% of root. As a
|
||||
// child of root, its absolute capaicty is also 50%.
|
||||
queue = createQueue("test2", null, 0.5f, 0.5f,
|
||||
queue = createQueue("test2", "root.test2", null, 0.5f, 0.5f,
|
||||
Resources.divideAndCeil(dominantResourceCalculator, res, 2));
|
||||
app = new FiCaSchedulerApp(appAttId, user, queue,
|
||||
queue.getAbstractUsersManager(), rmContext);
|
||||
@ -4816,7 +4816,8 @@ public void testApplicationQueuePercent()
|
||||
|
||||
// Queue "test2.1" is 50% of queue "test2", which is 50% of the cluster.
|
||||
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
|
||||
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f,
|
||||
AbstractCSQueue qChild =
|
||||
createQueue("test2.1", "root.test2.1", queue, 0.5f, 0.25f,
|
||||
Resources.divideAndCeil(dominantResourceCalculator, res, 4));
|
||||
app = new FiCaSchedulerApp(appAttId, user, qChild,
|
||||
qChild.getAbstractUsersManager(), rmContext);
|
||||
@ -4828,7 +4829,7 @@ public void testApplicationQueuePercent()
|
||||
app.getResourceUsageReport().getClusterUsagePercentage(), 0.01f);
|
||||
|
||||
// test that queueUsagePercentage returns neither NaN nor Infinite
|
||||
AbstractCSQueue zeroQueue = createQueue("test2.2", null,
|
||||
AbstractCSQueue zeroQueue = createQueue("test2.2", "root.test2.2", null,
|
||||
Float.MIN_VALUE, Float.MIN_VALUE,
|
||||
Resources.multiply(res, Float.MIN_VALUE));
|
||||
app = new FiCaSchedulerApp(appAttId, user, zeroQueue,
|
||||
@ -4901,10 +4902,12 @@ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
|
||||
return attId;
|
||||
}
|
||||
|
||||
private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
|
||||
private AbstractCSQueue
|
||||
createQueue(String name, String path, Queue parent, float capacity,
|
||||
float absCap, Resource res) {
|
||||
CSQueueMetrics metrics = CSQueueMetrics.forQueue(name, parent, false, cs.getConf());
|
||||
QueueInfo queueInfo = QueueInfo.newInstance(name, capacity, 1.0f, 0, null,
|
||||
QueueInfo queueInfo = QueueInfo.
|
||||
newInstance(name, path, capacity, 1.0f, 0, null,
|
||||
null, QueueState.RUNNING, null, "", null, false, -1.0f, null, false);
|
||||
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
|
||||
AbstractCSQueue queue = mock(AbstractCSQueue.class);
|
||||
|
Loading…
Reference in New Issue
Block a user