YARN-11484. [Federation] Router Supports Yarn Client CLI Cmds. (#6132)

This commit is contained in:
slfan1989 2023-11-08 10:38:35 +08:00 committed by GitHub
parent d9a6792ca9
commit 90e9aa272e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 777 additions and 15 deletions

View File

@ -399,6 +399,12 @@ public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo(
return client.getQueueInfo(queueName);
}
@Override
public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo(
String queueName, String subClusterId) throws YarnException, IOException {
return client.getQueueInfo(queueName, subClusterId);
}
@Override
public List<org.apache.hadoop.yarn.api.records.QueueInfo> getAllQueues()
throws YarnException, IOException {

View File

@ -46,6 +46,19 @@ public abstract class GetQueueInfoRequest {
return request;
}
@Public
@Stable
public static GetQueueInfoRequest newInstance(String queueName, boolean includeApplications,
boolean includeChildQueues, boolean recursive, String subClusterId) {
GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
request.setQueueName(queueName);
request.setIncludeApplications(includeApplications);
request.setIncludeChildQueues(includeChildQueues);
request.setRecursive(recursive);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the <em>queue name</em> for which to get queue information.
* @return <em>queue name</em> for which to get queue information
@ -114,5 +127,21 @@ public abstract class GetQueueInfoRequest {
@Public
@Stable
public abstract void setRecursive(boolean recursive);
/**
* Get SubClusterId.
* @return SubClusterId.
*/
@Public
@Stable
public abstract String getSubClusterId();
/**
* Set SubClusterId.
* @param subClusterId SubClusterId.
*/
@Public
@Stable
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -340,4 +340,177 @@ public abstract void setQueueConfigurations(
@Unstable
public abstract void setIntraQueuePreemptionDisabled(
boolean intraQueuePreemptionDisabled);
/**
* Get Scheduler type.
*
* @return SchedulerType.
*/
@Public
@Stable
public abstract String getSchedulerType();
/**
* Set Scheduler type.
* @param schedulerType scheduler Type.
*/
@Private
@Unstable
public abstract void setSchedulerType(String schedulerType);
/**
* Get the minimum resource VCore.
* @return minimum resource VCore.
*/
@Public
@Stable
public abstract int getMinResourceVCore();
/**
* Set the minimum resource VCore.
* @param vCore minimum resource VCore.
*/
@Private
@Unstable
public abstract void setMinResourceVCore(int vCore);
/**
* Get the minimum resource Memory.
* @return minimum resource Memory.
*/
@Public
@Stable
public abstract long getMinResourceMemory();
/**
* Set the minimum resource Memory.
* @param memory minimum resource Memory.
*/
@Private
@Unstable
public abstract void setMinResourceMemory(long memory);
/**
* Get the maximum resource VCore.
* @return maximum resource VCore.
*/
@Public
@Stable
public abstract int getMaxResourceVCore();
/**
* Set the maximum resource Memory.
* @param vCore maximum resource VCore.
*/
@Private
@Unstable
public abstract void setMaxResourceVCore(int vCore);
/**
* Get the maximum resource Memory.
* @return maximum resource Memory.
*/
@Public
@Stable
public abstract long getMaxResourceMemory();
/**
* Set the maximum resource Memory.
* @param memory maximum resource Memory.
*/
@Private
@Unstable
public abstract void setMaxResourceMemory(long memory);
/**
* Get the reserved resource VCore.
* @return reserved resource VCore.
*/
@Public
@Stable
public abstract int getReservedResourceVCore();
/**
* Set the reserved resource VCore.
* @param vCore reserved resource VCore.
*/
@Private
@Unstable
public abstract void setReservedResourceVCore(int vCore);
/**
* Get the reserved resource Memory.
* @return reserved resource Memory.
*/
@Public
@Stable
public abstract long getReservedResourceMemory();
/**
* Set the reserved resource Memory.
* @param memory reserved resource Memory.
*/
@Private
@Unstable
public abstract void setReservedResourceMemory(long memory);
/**
* Get the SteadyFairShare VCore.
* @return SteadyFairShare VCore.
*/
@Public
@Stable
public abstract int getSteadyFairShareVCore();
/**
* Set the SteadyFairShare VCore.
* @param vCore SteadyFairShare VCore.
*/
@Private
@Unstable
public abstract void setSteadyFairShareVCore(int vCore);
/**
* Get the SteadyFairShare Memory.
* @return SteadyFairShare Memory.
*/
@Public
@Stable
public abstract long getSteadyFairShareMemory();
/**
* Set the SteadyFairShare Memory.
* @param memory SteadyFairShare Memory.
*/
@Private
@Unstable
public abstract void setSteadyFairShareMemory(long memory);
/**
* Get the SubClusterId.
* @return the SubClusterId.
*/
@Public
@Stable
public abstract String getSubClusterId();
/**
* Set the SubClusterId.
* @param subClusterId the SubClusterId.
*/
@Private
@Unstable
public abstract void setSubClusterId(String subClusterId);
/**
* Get the MaxRunningApp.
* @return The number of MaxRunningApp.
*/
@Public
@Stable
public abstract int getMaxRunningApp();
@Private
@Unstable
public abstract void setMaxRunningApp(int maxRunningApp);
}

View File

@ -636,6 +636,17 @@ message QueueInfoProto {
optional float weight = 14;
optional string queuePath = 15;
optional int32 maxParallelApps = 16;
optional string schedulerType = 17;
optional int32 minResourceVCore = 18;
optional int64 minResourceMemory = 19;
optional int32 maxResourceVCore = 20;
optional int64 maxResourceMemory = 21;
optional int32 reservedResourceVCore = 22;
optional int64 reservedResourceMemory = 23;
optional int32 steadyFairShareVCore = 24;
optional int64 steadyFairShareMemory = 25;
optional string subClusterId = 26;
optional int32 maxRunningApp = 27;
}
message QueueConfigurationsProto {

View File

@ -232,6 +232,7 @@ message GetQueueInfoRequestProto {
optional bool includeApplications = 2;
optional bool includeChildQueues = 3;
optional bool recursive = 4;
optional string subClusterId = 5;
}
message GetQueueInfoResponseProto {

View File

@ -464,11 +464,28 @@ public abstract Token getRMDelegationToken(Text renewer)
* @throws YarnException
* in case of errors or if YARN rejects the request due to
* access-control restrictions.
* @throws IOException
* @throws IOException I/O exception has occurred.
*/
public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
IOException;
/**
* <p>
* Get information ({@link QueueInfo}) about a given <em>queue</em>.
* </p>
*
* @param queueName
* Name of the queue whose information is needed.
* @param subClusterId sub-cluster Id.
* @return queue information.
* @throws YarnException
* in case of errors or if YARN rejects the request due to
* access-control restrictions.
* @throws IOException I/O exception has occurred.
*/
public abstract QueueInfo getQueueInfo(String queueName, String subClusterId)
throws YarnException, IOException;
/**
* <p>
* Get information ({@link QueueInfo}) about all queues, recursively if there

View File

@ -735,6 +735,18 @@ public Token getRMDelegationToken(Text renewer)
return request;
}
private GetQueueInfoRequest getQueueInfoRequest(
String queueName, String subClusterId, boolean includeApplications,
boolean includeChildQueues, boolean recursive) {
GetQueueInfoRequest request = Records.newRecord(GetQueueInfoRequest.class);
request.setQueueName(queueName);
request.setSubClusterId(subClusterId);
request.setIncludeApplications(includeApplications);
request.setIncludeChildQueues(includeChildQueues);
request.setRecursive(recursive);
return request;
}
@Override
public QueueInfo getQueueInfo(String queueName) throws YarnException,
IOException {
@ -744,6 +756,15 @@ public QueueInfo getQueueInfo(String queueName) throws YarnException,
return rmClient.getQueueInfo(request).getQueueInfo();
}
@Override
public QueueInfo getQueueInfo(String queueName,
String subClusterId) throws YarnException, IOException {
GetQueueInfoRequest request =
getQueueInfoRequest(queueName, subClusterId, true, false, false);
Records.newRecord(GetQueueInfoRequest.class);
return rmClient.getQueueInfo(request).getQueueInfo();
}
@Override
public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
IOException {

View File

@ -32,6 +32,7 @@
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ToolRunner;
@ -42,6 +43,8 @@
import org.apache.hadoop.classification.VisibleForTesting;
import static org.apache.hadoop.yarn.client.util.YarnClientUtils.isYarnFederationEnabled;
@Private
@Unstable
public class QueueCLI extends YarnCLI {
@ -70,7 +73,8 @@ public int run(String[] args) throws Exception {
"All child queues are displayed according to the parent queue. " +
"If the value is all, all queues are displayed.");
opts.getOption(LIST_CMD).setArgName("Parent Queue Name");
opts.addOption(OPTION_SUBCLUSTERID, true, "We support setting subClusterId in " +
"YARN Federation mode to specify specific subClusters.");
CommandLine cliParser = null;
try {
@ -82,11 +86,19 @@ public int run(String[] args) throws Exception {
}
createAndStartYarnClient();
if (cliParser.hasOption(STATUS_CMD)) {
if (args.length != 2) {
// Our possible options are -status root.a,
// -subcluster sc-1, we will have up to 4 args
if (args.length > 4) {
printUsage(opts);
return -1;
}
return listQueue(cliParser.getOptionValue(STATUS_CMD));
String queue = cliParser.getOptionValue(STATUS_CMD);
String subClusterId = cliParser.getOptionValue(OPTION_SUBCLUSTERID);
if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) {
return listQueue(queue, subClusterId);
} else {
return listQueue(queue);
}
} else if (cliParser.hasOption(HELP_CMD)) {
printUsage(opts);
return 0;
@ -116,9 +128,10 @@ void printUsage(Options opts) {
/**
* Lists the Queue Information matching the given queue name.
*
* @param queueName
* @throws YarnException
* @throws IOException
* @param queueName Queue name to be queried.
* @throws YarnException YarnException indicates exceptions from yarn servers.
* @throws IOException I/O exception has occurred.
* @return 0, the command execution is successful; -1, the command execution fails.
*/
private int listQueue(String queueName) throws YarnException, IOException {
int rc;
@ -127,6 +140,9 @@ private int listQueue(String queueName) throws YarnException, IOException {
QueueInfo queueInfo = client.getQueueInfo(queueName);
if (queueInfo != null) {
if (isYarnFederationEnabled(getConf())) {
writer.println("Using YARN Federation mode.");
}
writer.println("Queue Information : ");
printQueueInfo(writer, queueInfo);
rc = 0;
@ -139,6 +155,41 @@ private int listQueue(String queueName) throws YarnException, IOException {
return rc;
}
/**
* Lists the Queue Information matching the given queue name.
*
* @param queueName Queue name to be queried.
* @param subClusterId Subcluster id.
* @throws YarnException YarnException indicates exceptions from yarn servers.
* @throws IOException I/O exception has occurred.
* @return 0, the command execution is successful; -1, the command execution fails.
*/
private int listQueue(String queueName, String subClusterId)
throws YarnException, IOException {
int rc;
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(sysout, Charset.forName("UTF-8")));
QueueInfo queueInfo = client.getQueueInfo(queueName, subClusterId);
if (queueInfo != null) {
if (isYarnFederationEnabled(getConf())) {
writer.println("Using YARN Federation mode.");
}
if (StringUtils.isNotBlank(subClusterId)) {
writer.println("SubClusterId : " + subClusterId + ", Queue Information : ");
} else {
writer.println("Queue Information : ");
}
printQueueInfo(writer, queueInfo);
rc = 0;
} else {
writer.println("Cannot get queue from RM by queueName = " + queueName
+ ", subClusterId = " + subClusterId + " please check.");
rc = -1;
}
writer.flush();
return rc;
}
/**
* List information about all child queues based on the parent queue.
* @param parentQueueName The name of the payment queue.
@ -174,6 +225,66 @@ private int listChildQueues(String parentQueueName) throws IOException, YarnExce
}
private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
String schedulerType = queueInfo.getSchedulerType();
if (StringUtils.equals("FairScheduler", schedulerType)) {
printFairSchedulerQueue(writer, queueInfo);
} else {
printQueue(writer, queueInfo);
}
}
/**
* Print Queue information of FairScheduler.
*
* @param writer PrintWriter.
* @param queueInfo Queue Information.
*/
private void printFairSchedulerQueue(PrintWriter writer, QueueInfo queueInfo) {
String generateQueueInfoMessage = generateQueueInfoMessage(queueInfo);
writer.print(generateQueueInfoMessage);
}
private String generateQueueInfoMessage(QueueInfo queueInfo) {
StringBuilder stringBuilder = new StringBuilder();
if (queueInfo.getSchedulerType() != null) {
stringBuilder.append("Scheduler Name : ").append(queueInfo.getSchedulerType()).append("\n");
}
stringBuilder.append("Queue Name : ").append(queueInfo.getQueueName()).append("\n");
DecimalFormat df = new DecimalFormat("0.00");
stringBuilder.append("\tWeight : ").append(df.format(queueInfo.getWeight())).append("\n");
stringBuilder.append("\tState : ").append(queueInfo.getQueueState()).append("\n");
stringBuilder.append("\tMinResource : ").append("<memory : ")
.append(queueInfo.getMinResourceMemory()).append(", vCores:")
.append(queueInfo.getMinResourceVCore()).append(">").append("\n");
stringBuilder.append("\tMaxResource : ").append("<memory : ")
.append(queueInfo.getMaxResourceMemory()).append(", vCores:")
.append(queueInfo.getMaxResourceVCore()).append(">").append("\n");
stringBuilder.append("\tReservedResource : ").append("<memory : ")
.append(queueInfo.getReservedResourceMemory()).append(", vCores:")
.append(queueInfo.getReservedResourceVCore()).append(">").append("\n");
stringBuilder.append("\tSteadyFairShare : ").append("<memory : ")
.append(queueInfo.getSteadyFairShareMemory()).append(", vCores:")
.append(queueInfo.getSteadyFairShareVCore()).append(">").append("\n");
Boolean queuePreemption = queueInfo.getPreemptionDisabled();
if (queuePreemption != null) {
stringBuilder.append("\tQueue Preemption : ")
.append(queuePreemption ? "enabled" : "disabled").append("\n");
}
return stringBuilder.toString();
}
/**
* Print Queue information.
*
* @param writer PrintWriter.
* @param queueInfo Queue Information.
*/
private void printQueue(PrintWriter writer, QueueInfo queueInfo) {
if (queueInfo.getSchedulerType() != null) {
writer.print("Scheduler Name : ");
writer.println(queueInfo.getSchedulerType());
}
writer.print("Queue Name : ");
writer.println(queueInfo.getQueueName());
writer.print("Queue Path : ");
@ -208,7 +319,7 @@ private void printQueueInfo(PrintWriter writer, QueueInfo queueInfo) {
}
labelList.append(nodeLabel);
}
writer.println(labelList.toString());
writer.println(labelList);
Boolean preemptStatus = queueInfo.getPreemptionDisabled();
if (preemptStatus != null) {

View File

@ -37,6 +37,7 @@ public abstract class YarnCLI extends Configured implements Tool {
public static final String MOVE_TO_QUEUE_CMD = "movetoqueue";
public static final String HELP_CMD = "help";
public static final String SIGNAL_CMD = "signal";
public static final String OPTION_SUBCLUSTERID = "subClusterId";
protected PrintStream sysout;
protected PrintStream syserr;
protected YarnClient client;

View File

@ -250,4 +250,10 @@ public String run() throws Exception {
});
return challenge;
}
public static boolean isYarnFederationEnabled(Configuration conf) {
boolean isEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
return isEnabled;
}
}

View File

@ -88,6 +88,8 @@
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -1969,6 +1971,98 @@ public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoWithFairScheduler() throws Exception {
// In this test case, we will simulate the queue information of fairScheduler
// and check the results of the queue information.
QueueCLI cli = createAndGetQueueCLI();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName("queueA");
queueInfo.setSchedulerType("FairScheduler");
queueInfo.setQueueState(QueueState.RUNNING);
queueInfo.setCapacity(0.3f);
queueInfo.setCurrentCapacity(0.1f);
queueInfo.setWeight(0.3f);
queueInfo.setMinResourceVCore(1);
queueInfo.setMinResourceMemory(1024);
queueInfo.setMaxResourceVCore(10);
queueInfo.setMaxResourceMemory(8192);
queueInfo.setReservedResourceVCore(0);
queueInfo.setReservedResourceMemory(0);
queueInfo.setSteadyFairShareVCore(10);
queueInfo.setSteadyFairShareMemory(8192);
queueInfo.setMaxRunningApp(10);
queueInfo.setPreemptionDisabled(true);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[]{"-status", "queueA"});
assertEquals(0, result);
verify(client).getQueueInfo("queueA");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Queue Information : ");
pw.println("Scheduler Name : FairScheduler");
pw.println("Queue Name : queueA");
pw.println("\tWeight : 0.30");
pw.println("\tState : RUNNING");
pw.println("\tMinResource : <memory : 0, vCores:10>");
pw.println("\tMaxResource : <memory : 8192, vCores:0>");
pw.println("\tReservedResource : <memory : 0, vCores:0>");
pw.println("\tSteadyFairShare : <memory : 8192, vCores:10>");
pw.println("\tQueue Preemption : enabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoWithFairSchedulerAndSubClusterId() throws Exception {
// In this test case,
// we simulated printing FairScheduler queue information in YARN Federation mode.
QueueCLI cli = createAndGetQueueCLI();
Configuration config = new Configuration();
config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
cli.setConf(config);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName("queueA");
queueInfo.setSchedulerType("FairScheduler");
queueInfo.setQueueState(QueueState.RUNNING);
queueInfo.setCapacity(0.3f);
queueInfo.setCurrentCapacity(0.1f);
queueInfo.setWeight(0.3f);
queueInfo.setMinResourceVCore(1);
queueInfo.setMinResourceMemory(1024);
queueInfo.setMaxResourceVCore(10);
queueInfo.setMaxResourceMemory(8192);
queueInfo.setReservedResourceVCore(0);
queueInfo.setReservedResourceMemory(0);
queueInfo.setSteadyFairShareVCore(10);
queueInfo.setSteadyFairShareMemory(8192);
queueInfo.setMaxRunningApp(10);
queueInfo.setPreemptionDisabled(true);
when(client.getQueueInfo(any(String.class), any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[]{"-status", "queueA", "-subClusterId", "SC-1"});
assertEquals(0, result);
verify(client).getQueueInfo("queueA", "SC-1");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("Using YARN Federation mode.");
pw.println("SubClusterId : SC-1, Queue Information : ");
pw.println("Scheduler Name : FairScheduler");
pw.println("Queue Name : " + "queueA");
pw.println("\tWeight : " + "0.30");
pw.println("\tState : " + "RUNNING");
pw.println("\tMinResource : " + "<memory : 0, vCores:10>");
pw.println("\tMaxResource : " + "<memory : 8192, vCores:0>");
pw.println("\tReservedResource : " + "<memory : 0, vCores:0>");
pw.println("\tSteadyFairShare : " + "<memory : 8192, vCores:10>");
pw.println("\tQueue Preemption : " + "enabled");
pw.close();
String queueInfoStr = baos.toString("UTF-8");
Assert.assertEquals(queueInfoStr, sysOutStream.toString());
}
@Test
public void testGetQueueInfoWithNonExistedQueue() throws Exception {

View File

@ -96,6 +96,22 @@ public void setRecursive(boolean recursive) {
builder.setRecursive(recursive);
}
@Override
public String getSubClusterId() {
GetQueueInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : "";
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = GetQueueInfoRequestProto.newBuilder(proto);

View File

@ -553,4 +553,136 @@ public void setIntraQueuePreemptionDisabled(
maybeInitBuilder();
builder.setIntraQueuePreemptionDisabled(intraQueuePreemptionDisabled);
}
@Override
public String getSchedulerType() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSchedulerType()) ? p.getSchedulerType() : null;
}
@Override
public void setSchedulerType(String schedulerType) {
maybeInitBuilder();
builder.setSchedulerType(schedulerType);
}
@Override
public int getMinResourceVCore() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMinResourceVCore()) ? p.getMinResourceVCore() : 0;
}
@Override
public void setMinResourceVCore(int vCore) {
maybeInitBuilder();
builder.setMinResourceVCore(vCore);
}
@Override
public long getMinResourceMemory() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxResourceVCore()) ? p.getMaxResourceVCore() : 0;
}
@Override
public void setMinResourceMemory(long memory) {
maybeInitBuilder();
builder.setMinResourceMemory(memory);
}
@Override
public int getMaxResourceVCore() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxResourceVCore()) ? p.getMaxResourceVCore() : 0;
}
@Override
public void setMaxResourceVCore(int vCore) {
maybeInitBuilder();
builder.setMinResourceVCore(vCore);
}
@Override
public long getMaxResourceMemory() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxResourceMemory()) ? p.getMaxResourceMemory() : 0;
}
@Override
public void setMaxResourceMemory(long memory) {
maybeInitBuilder();
builder.setMaxResourceMemory(memory);
}
@Override
public int getReservedResourceVCore() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasReservedResourceVCore()) ? p.getReservedResourceVCore() : 0;
}
@Override
public void setReservedResourceVCore(int vCore) {
maybeInitBuilder();
builder.setReservedResourceVCore(vCore);
}
@Override
public long getReservedResourceMemory() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasReservedResourceMemory()) ? p.getReservedResourceMemory() : 0;
}
@Override
public void setReservedResourceMemory(long memory) {
maybeInitBuilder();
builder.setReservedResourceMemory(memory);
}
@Override
public int getSteadyFairShareVCore() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSteadyFairShareVCore()) ? p.getSteadyFairShareVCore() : 0;
}
@Override
public void setSteadyFairShareVCore(int vCore) {
maybeInitBuilder();
builder.setSteadyFairShareVCore(vCore);
}
@Override
public long getSteadyFairShareMemory() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSteadyFairShareMemory()) ? p.getSteadyFairShareMemory() : 0;
}
@Override
public void setSteadyFairShareMemory(long memory) {
maybeInitBuilder();
builder.setSteadyFairShareMemory(memory);
}
@Override
public String getSubClusterId() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
builder.setSubClusterId(subClusterId);
}
@Override
public int getMaxRunningApp() {
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxRunningApp()) ? p.getMaxRunningApp() : 0;
}
@Override
public void setMaxRunningApp(int maxRunningApp) {
maybeInitBuilder();
builder.setMaxRunningApp(maxRunningApp);
}
}

View File

@ -39,6 +39,7 @@ private CSQueueInfoProvider() {
public static QueueInfo getQueueInfo(AbstractCSQueue csQueue) {
QueueInfo queueInfo = RECORD_FACTORY.newRecordInstance(QueueInfo.class);
queueInfo.setSchedulerType("CapacityScheduler");
queueInfo.setQueueName(csQueue.getQueuePathObject().getLeafName());
queueInfo.setQueuePath(csQueue.getQueuePathObject().getFullPath());
queueInfo.setAccessibleNodeLabels(csQueue.getAccessibleNodeLabels());

View File

@ -234,6 +234,7 @@ public Priority getPriority() {
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setSchedulerType("FairScheduler");
queueInfo.setQueueName(getQueueName());
if (scheduler.getClusterResource().getMemorySize() == 0) {
@ -250,7 +251,37 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
getFairShare().getMemorySize());
}
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
// set Weight
queueInfo.setWeight(getWeight());
// set MinShareResource
Resource minShareResource = getMinShare();
queueInfo.setMinResourceVCore(minShareResource.getVirtualCores());
queueInfo.setMinResourceMemory(minShareResource.getMemorySize());
// set MaxShareResource
Resource maxShareResource =
Resources.componentwiseMin(getMaxShare(), scheduler.getClusterResource());
queueInfo.setMaxResourceVCore(maxShareResource.getVirtualCores());
queueInfo.setMaxResourceMemory(maxShareResource.getMemorySize());
// set ReservedResource
Resource newReservedResource = getReservedResource();
queueInfo.setReservedResourceVCore(newReservedResource.getVirtualCores());
queueInfo.setReservedResourceMemory(newReservedResource.getMemorySize());
// set SteadyFairShare
Resource newSteadyFairShare = getSteadyFairShare();
queueInfo.setSteadyFairShareVCore(newSteadyFairShare.getVirtualCores());
queueInfo.setSteadyFairShareMemory(newSteadyFairShare.getMemorySize());
// set MaxRunningApp
queueInfo.setMaxRunningApp(getMaxRunningApps());
// set Preemption
queueInfo.setPreemptionDisabled(isPreemptable());
ArrayList<QueueInfo> childQueueInfos = new ArrayList<>();
if (includeChildQueues) {
Collection<FSQueue> childQueues = getChildQueues();
for (FSQueue child : childQueues) {

View File

@ -27,6 +27,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -902,6 +903,40 @@ <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
return results.values();
}
<R> Collection<R> invoke(ClientMethod request, Class<R> clazz, String subClusterId)
throws YarnException {
// Get Active SubClusters
Map<SubClusterId, SubClusterInfo> subClusterInfoMap = federationFacade.getSubClusters(true);
// According to subCluster of string type, convert to SubClusterId type
SubClusterId subClusterIdKey = SubClusterId.newInstance(subClusterId);
// If the provided subCluster is not Active or does not exist,
// an exception will be returned directly.
if (!subClusterInfoMap.containsKey(subClusterIdKey)) {
throw new YarnException("subClusterId = " + subClusterId + " is not an active subCluster.");
}
try {
ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterIdKey);
String methodName = request.getMethodName();
Class<?>[] types = request.getTypes();
Object[] params = request.getParams();
Method method = ApplicationClientProtocol.class.getMethod(methodName, types);
Object result = method.invoke(protocol, params);
if (result != null) {
return Collections.singletonList(clazz.cast(result));
}
} catch (Exception e) {
throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
subClusterId, e);
}
throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
subClusterId);
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
@ -933,6 +968,21 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throw new YarnException("Unable to get cluster nodes.");
}
/**
* <p>The interface used by clients to get information about <em>queues</em>
* from the <code>ResourceManager</code>.</p>
*
* <p>The client, via {@link GetQueueInfoRequest}, can ask for details such
* as used/total resources, child queues, running applications etc.</p>
*
* <p> In secure mode,the <code>ResourceManager</code> verifies access before
* providing the information.</p>
*
* @param request request to get queue information
* @return queue information
* @throws YarnException exceptions from yarn servers.
* @throws IOException io error occur.
*/
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
@ -943,13 +993,18 @@ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
TARGET_CLIENT_RM_SERVICE, msg);
RouterServerUtil.logAndThrowException(msg, null);
}
String rSubCluster = request.getSubClusterId();
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getQueueInfo",
new Class[]{GetQueueInfoRequest.class}, new Object[]{request});
Collection<GetQueueInfoResponse> queues = null;
try {
queues = invokeConcurrent(remoteMethod, GetQueueInfoResponse.class);
if (StringUtils.isNotBlank(rSubCluster)) {
queues = invoke(remoteMethod, GetQueueInfoResponse.class, rSubCluster);
} else {
queues = invokeConcurrent(remoteMethod, GetQueueInfoResponse.class);
}
} catch (Exception ex) {
routerMetrics.incrGetQueueInfoFailedRetrieved();
String msg = "Unable to get queue [" + request.getQueueName() + "] to exception.";

View File

@ -423,6 +423,47 @@ public static GetQueueInfoResponse mergeQueues(
if (queueInfo.getAccessibleNodeLabels() != null) {
accessibleNodeLabels.addAll(queueInfo.getAccessibleNodeLabels());
}
// set min resourceVCore
queueInfo.setMinResourceVCore(queueInfo.getMinResourceVCore() +
response.getQueueInfo().getMinResourceVCore());
// set min resourceMemory
queueInfo.setMinResourceMemory(queueInfo.getMinResourceMemory() +
response.getQueueInfo().getMinResourceMemory());
// set max resourceVCore
queueInfo.setMinResourceVCore(queueInfo.getMaxResourceVCore() +
response.getQueueInfo().getMaxResourceVCore());
// set max resourceMemory
queueInfo.setMinResourceMemory(queueInfo.getMaxResourceMemory() +
response.getQueueInfo().getMaxResourceMemory());
// set reserved resourceVCore
queueInfo.setReservedResourceVCore(queueInfo.getReservedResourceVCore() +
response.getQueueInfo().getMaxResourceVCore());
// set reserved resourceMemory
queueInfo.setReservedResourceMemory(queueInfo.getReservedResourceMemory() +
response.getQueueInfo().getMaxResourceMemory());
// set maxRunningApp
queueInfo.setMaxRunningApp(queueInfo.getMaxRunningApp() +
response.getQueueInfo().getMaxRunningApp());
// set steadyFairShareVCore
queueInfo.setSteadyFairShareVCore(queueInfo.getSteadyFairShareVCore() +
response.getQueueInfo().getSteadyFairShareVCore());
// set steadyFairShareMemory
queueInfo.setSteadyFairShareMemory(queueInfo.getSteadyFairShareMemory() +
response.getQueueInfo().getSteadyFairShareMemory());
// set Weight
queueInfo.setWeight(queueInfo.getWeight() +
response.getQueueInfo().getWeight());
if (response.getQueueInfo() != null) {
accessibleNodeLabels.addAll(response.getQueueInfo().getAccessibleNodeLabels());
}

View File

@ -1167,11 +1167,27 @@ public void testGetQueueInfo() throws Exception {
QueueInfo queueInfo = response.getQueueInfo();
Assert.assertNotNull(queueInfo);
Assert.assertEquals(queueInfo.getQueueName(), "root");
Assert.assertEquals(queueInfo.getCapacity(), 4.0, 0);
Assert.assertEquals(queueInfo.getCurrentCapacity(), 0.0, 0);
Assert.assertEquals(queueInfo.getChildQueues().size(), 12, 0);
Assert.assertEquals(queueInfo.getAccessibleNodeLabels().size(), 1);
Assert.assertEquals("root", queueInfo.getQueueName());
Assert.assertEquals(4.0, queueInfo.getCapacity(), 0);
Assert.assertEquals(0.0, queueInfo.getCurrentCapacity(), 0);
Assert.assertEquals(12, queueInfo.getChildQueues().size(), 0);
Assert.assertEquals(1, queueInfo.getAccessibleNodeLabels().size());
}
@Test
public void testSubClusterGetQueueInfo() throws IOException, YarnException {
// We have set up a unit test where we access queue information for subcluster1.
GetQueueInfoResponse response = interceptor.getQueueInfo(
GetQueueInfoRequest.newInstance("root", true, true, true, "1"));
Assert.assertNotNull(response);
QueueInfo queueInfo = response.getQueueInfo();
Assert.assertNotNull(queueInfo);
Assert.assertEquals("root", queueInfo.getQueueName());
Assert.assertEquals(1.0, queueInfo.getCapacity(), 0);
Assert.assertEquals(0.0, queueInfo.getCurrentCapacity(), 0);
Assert.assertEquals(3, queueInfo.getChildQueues().size(), 0);
Assert.assertEquals(1, queueInfo.getAccessibleNodeLabels().size());
}
@Test