YARN-11474.The yarn queue list is displayed on the CLI (#5577)

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
This commit is contained in:
yl09099 2023-04-27 18:05:48 +08:00 committed by GitHub
parent eb749ddd4d
commit 245fde17d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 128 additions and 1 deletions

View File

@ -21,7 +21,9 @@
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
@ -43,6 +45,8 @@
public class QueueCLI extends YarnCLI {
public static final String QUEUE = "queue";
public static final String ALLTAG = "all";
public static void main(String[] args) throws Exception {
QueueCLI cli = new QueueCLI();
cli.setSysOutPrintStream(System.out);
@ -60,6 +64,11 @@ public int run(String[] args) throws Exception {
"List queue information about given queue.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(STATUS_CMD).setArgName("Queue Name");
opts.addOption(LIST_CMD, true,
"All child queues are displayed according to the parent queue. " +
"If the value is all, all queues are displayed.");
opts.getOption(LIST_CMD).setArgName("Parent Queue Name");
CommandLine cliParser = null;
try {
@ -79,6 +88,12 @@ public int run(String[] args) throws Exception {
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(opts);
return 0;
} else if (cliParser.hasOption(LIST_CMD)) {
if (args.length != 2) {
printUsage(opts);
return -1;
}
return listChildQueues(cliParser.getOptionValue(LIST_CMD));
} else {
syserr.println("Invalid Command Usage : ");
printUsage(opts);
@ -97,7 +112,7 @@ void printUsage(Options opts) {
}
/**
* Lists the Queue Information matching the given queue name
* Lists the Queue Information matching the given queue name.
*
* @param queueName
* @throws YarnException
@ -122,6 +137,40 @@ private int listQueue(String queueName) throws YarnException, IOException {
return rc;
}
/**
* List information about all child queues based on the parent queue.
* @param parentQueueName The name of the payment queue.
* @return The status code of execution.
* @throws IOException failed or interrupted I/O operations.
* @throws YarnException exceptions from yarn servers.
*/
private int listChildQueues(String parentQueueName) throws IOException, YarnException {
int exitCode;
PrintWriter writer = new PrintWriter(new OutputStreamWriter(
sysout, Charset.forName(StandardCharsets.UTF_8.name())));
if (parentQueueName.equalsIgnoreCase(ALLTAG)) {
List<QueueInfo> queueInfos = client.getAllQueues();
if (queueInfos != null) {
printQueueInfos(writer, queueInfos);
exitCode = 0;
} else {
writer.println("Cannot get any queues from RM,please check.");
exitCode = -1;
}
} else {
List<QueueInfo> childQueueInfos = client.getChildQueueInfos(parentQueueName);
if (childQueueInfos != null) {
printQueueInfos(writer, childQueueInfos);
exitCode = 0;
} else {
writer.println("Cannot get any queues under " + parentQueueName + " from RM,please check.");
exitCode = -1;
}
}
writer.flush();
return exitCode;
}
private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
writer.print("Queue Name : ");
writer.println(queueInfo.getQueueName());
@ -171,4 +220,29 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
writer.println(intraQueuePreemption ? "disabled" : "enabled");
}
}
private void printQueueInfos(PrintWriter writer, List<QueueInfo> queueInfos) {
writer.print(queueInfos.size() + " queues were found : \n");
writer.print("Queue Name\tQueue Path\tState\tCapacity\tCurrent Capacity" +
"\tMaximum Capacity\tWeight\tMaximum Parallel Apps\n");
for (QueueInfo queueInfo : queueInfos) {
writer.print(queueInfo.getQueueName());
writer.print("\t");
writer.print(queueInfo.getQueuePath());
writer.print("\t");
writer.print(queueInfo.getQueueState());
DecimalFormat df = new DecimalFormat("#.00");
writer.print("\t");
writer.print(df.format(queueInfo.getCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfo.getCurrentCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfo.getMaximumCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfo.getWeight()));
writer.print("\t");
writer.print(queueInfo.getMaxParallelApps());
writer.print("\n");
}
}
}

View File

@ -38,6 +38,7 @@
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@ -1748,6 +1749,58 @@ public void testGetQueueInfo() throws Exception {
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfos() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
Set<String> nodeLabels = new HashSet<String>();
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
List<QueueInfo> queueInfos = new ArrayList<>();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", "root.queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false);
queueInfos.add(queueInfo);
QueueInfo queueInfo1 = QueueInfo.newInstance("queueB", "root.queueB", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false);
queueInfos.add(queueInfo1);
QueueInfo queueInfo2 = QueueInfo.newInstance("queueC", "root.queueC", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false);
queueInfos.add(queueInfo2);
QueueInfo queueInfo3 = QueueInfo.newInstance("queueD", "root.queueD", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false);
queueInfos.add(queueInfo3);
when(client.getAllQueues()).thenReturn(queueInfos);
int result = cli.run(new String[] {"-list", "all"});
Assert.assertEquals(0, result);
verify(client).getAllQueues();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter writer = new PrintWriter(baos);
writer.print(queueInfos.size() + " queues were found : \n");
writer.print("Queue Name\tQueue Path\tState\tCapacity\tCurrent Capacity\t" +
"Maximum Capacity\tWeight\tMaximum Parallel Apps\n");
for (QueueInfo queueInfoe : queueInfos) {
writer.print(queueInfoe.getQueueName());
writer.print("\t");
writer.print(queueInfoe.getQueuePath());
writer.print("\t");
writer.print(queueInfoe.getQueueState());
DecimalFormat df = new DecimalFormat("#.00");
writer.print("\t");
writer.print(df.format(queueInfoe.getCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfoe.getCurrentCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfoe.getMaximumCapacity() * 100) + "%");
writer.print("\t");
writer.print(df.format(queueInfoe.getWeight()));
writer.print("\t");
writer.print(queueInfoe.getMaxParallelApps());
writer.print("\n");
}
writer.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();