From 245fde17d7286102bc9925a54e66e3e7b2cd006e Mon Sep 17 00:00:00 2001 From: yl09099 <33595968+yl09099@users.noreply.github.com> Date: Thu, 27 Apr 2023 18:05:48 +0800 Subject: [PATCH] YARN-11474.The yarn queue list is displayed on the CLI (#5577) Reviewed-by: Shilun Fan Signed-off-by: Shilun Fan --- .../hadoop/yarn/client/cli/QueueCLI.java | 76 ++++++++++++++++++- .../hadoop/yarn/client/cli/TestYarnCLI.java | 53 +++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java index 46facffd28..c8d71514e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/QueueCLI.java @@ -21,7 +21,9 @@ import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; +import java.util.List; import java.util.Set; import org.apache.commons.cli.CommandLine; @@ -43,6 +45,8 @@ public class QueueCLI extends YarnCLI { public static final String QUEUE = "queue"; + public static final String ALLTAG = "all"; + public static void main(String[] args) throws Exception { QueueCLI cli = new QueueCLI(); cli.setSysOutPrintStream(System.out); @@ -60,6 +64,11 @@ public int run(String[] args) throws Exception { "List queue information about given queue."); opts.addOption(HELP_CMD, false, "Displays help for all commands."); opts.getOption(STATUS_CMD).setArgName("Queue Name"); + opts.addOption(LIST_CMD, true, + "All child queues are displayed according to the parent queue. " + + "If the value is all, all queues are displayed."); + opts.getOption(LIST_CMD).setArgName("Parent Queue Name"); + CommandLine cliParser = null; try { @@ -79,6 +88,12 @@ public int run(String[] args) throws Exception { } else if (cliParser.hasOption(HELP_CMD)) { printUsage(opts); return 0; + } else if (cliParser.hasOption(LIST_CMD)) { + if (args.length != 2) { + printUsage(opts); + return -1; + } + return listChildQueues(cliParser.getOptionValue(LIST_CMD)); } else { syserr.println("Invalid Command Usage : "); printUsage(opts); @@ -97,7 +112,7 @@ void printUsage(Options opts) { } /** - * Lists the Queue Information matching the given queue name + * Lists the Queue Information matching the given queue name. * * @param queueName * @throws YarnException @@ -122,6 +137,40 @@ private int listQueue(String queueName) throws YarnException, IOException { return rc; } + /** + * List information about all child queues based on the parent queue. + * @param parentQueueName The name of the payment queue. + * @return The status code of execution. + * @throws IOException failed or interrupted I/O operations. + * @throws YarnException exceptions from yarn servers. + */ + private int listChildQueues(String parentQueueName) throws IOException, YarnException { + int exitCode; + PrintWriter writer = new PrintWriter(new OutputStreamWriter( + sysout, Charset.forName(StandardCharsets.UTF_8.name()))); + if (parentQueueName.equalsIgnoreCase(ALLTAG)) { + List queueInfos = client.getAllQueues(); + if (queueInfos != null) { + printQueueInfos(writer, queueInfos); + exitCode = 0; + } else { + writer.println("Cannot get any queues from RM,please check."); + exitCode = -1; + } + } else { + List childQueueInfos = client.getChildQueueInfos(parentQueueName); + if (childQueueInfos != null) { + printQueueInfos(writer, childQueueInfos); + exitCode = 0; + } else { + writer.println("Cannot get any queues under " + parentQueueName + " from RM,please check."); + exitCode = -1; + } + } + writer.flush(); + return exitCode; + } + private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) { writer.print("Queue Name : "); writer.println(queueInfo.getQueueName()); @@ -171,4 +220,29 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) { writer.println(intraQueuePreemption ? "disabled" : "enabled"); } } + + private void printQueueInfos(PrintWriter writer, List queueInfos) { + writer.print(queueInfos.size() + " queues were found : \n"); + writer.print("Queue Name\tQueue Path\tState\tCapacity\tCurrent Capacity" + + "\tMaximum Capacity\tWeight\tMaximum Parallel Apps\n"); + for (QueueInfo queueInfo : queueInfos) { + writer.print(queueInfo.getQueueName()); + writer.print("\t"); + writer.print(queueInfo.getQueuePath()); + writer.print("\t"); + writer.print(queueInfo.getQueueState()); + DecimalFormat df = new DecimalFormat("#.00"); + writer.print("\t"); + writer.print(df.format(queueInfo.getCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfo.getCurrentCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfo.getMaximumCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfo.getWeight())); + writer.print("\t"); + writer.print(queueInfo.getMaxParallelApps()); + writer.print("\n"); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index 22c7cc34bc..469cb2f72f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -38,6 +38,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.UnsupportedEncodingException; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -1748,6 +1749,58 @@ public void testGetQueueInfo() throws Exception { Assert.assertEquals(queueInfoStr, sysOutStream.toString()); } + @Test + public void testGetQueueInfos() throws Exception { + QueueCLI cli = createAndGetQueueCLI(); + Set nodeLabels = new HashSet(); + nodeLabels.add("GPU"); + nodeLabels.add("JDK_7"); + List queueInfos = new ArrayList<>(); + QueueInfo queueInfo = QueueInfo.newInstance("queueA", "root.queueA", 0.4f, 0.8f, 0.5f, + null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false); + queueInfos.add(queueInfo); + QueueInfo queueInfo1 = QueueInfo.newInstance("queueB", "root.queueB", 0.4f, 0.8f, 0.5f, + null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false); + queueInfos.add(queueInfo1); + QueueInfo queueInfo2 = QueueInfo.newInstance("queueC", "root.queueC", 0.4f, 0.8f, 0.5f, + null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false); + queueInfos.add(queueInfo2); + QueueInfo queueInfo3 = QueueInfo.newInstance("queueD", "root.queueD", 0.4f, 0.8f, 0.5f, + null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, -1.0f, 10, null, false); + queueInfos.add(queueInfo3); + when(client.getAllQueues()).thenReturn(queueInfos); + int result = cli.run(new String[] {"-list", "all"}); + Assert.assertEquals(0, result); + verify(client).getAllQueues(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter writer = new PrintWriter(baos); + writer.print(queueInfos.size() + " queues were found : \n"); + writer.print("Queue Name\tQueue Path\tState\tCapacity\tCurrent Capacity\t" + + "Maximum Capacity\tWeight\tMaximum Parallel Apps\n"); + for (QueueInfo queueInfoe : queueInfos) { + writer.print(queueInfoe.getQueueName()); + writer.print("\t"); + writer.print(queueInfoe.getQueuePath()); + writer.print("\t"); + writer.print(queueInfoe.getQueueState()); + DecimalFormat df = new DecimalFormat("#.00"); + writer.print("\t"); + writer.print(df.format(queueInfoe.getCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfoe.getCurrentCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfoe.getMaximumCapacity() * 100) + "%"); + writer.print("\t"); + writer.print(df.format(queueInfoe.getWeight())); + writer.print("\t"); + writer.print(queueInfoe.getMaxParallelApps()); + writer.print("\n"); + } + writer.close(); + String queueInfoStr = baos.toString("UTF-8"); + Assert.assertEquals(queueInfoStr, sysOutStream.toString()); + } + @Test public void testGetQueueInfoOverrideIntraQueuePreemption() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();