diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 1ad77e0b30..916dcd50aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; @Private public interface ResourceManagerAdministrationProtocol extends GetUserMappingsProtocol { @@ -194,7 +196,7 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr /** * In YARN-Federation mode, this method provides a way to save queue policies in batches. * - * @param request BatchSaveFederationQueuePolicies Request + * @param request BatchSaveFederationQueuePolicies Request. * @return Response from batchSaveFederationQueuePolicies. * @throws YarnException exceptions from yarn servers. * @throws IOException if an IO error occurred. @@ -203,4 +205,17 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Idempotent BatchSaveFederationQueuePoliciesResponse batchSaveFederationQueuePolicies( BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException; + + /** + * In YARN-Federation mode, this method provides a way to list policies. + * + * @param request QueryFederationQueuePoliciesRequest Request. + * @return Response from listFederationQueuePolicies. + * @throws YarnException exceptions from yarn servers. + * @throws IOException if an IO error occurred. + */ + @Private + @Idempotent + QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesRequest.java new file mode 100644 index 0000000000..7572ff495a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesRequest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * Request for querying Federation Queue Policies. + * It includes several query conditions such as queue, queues, pageSize, and currentPage. + * + * queue: The specific queue name or identifier to query the Federation Queue Policy for. + * queues: A list of queue names or identifiers for which to query the Federation Queue Policies. + * pageSize: The number of policies to display per page. + * currentPage: The current page number. + */ +@Private +@Unstable +public abstract class QueryFederationQueuePoliciesRequest { + + @Private + @Unstable + public static QueryFederationQueuePoliciesRequest newInstance( + int pageSize, int currentPage, String queue, List queues) { + QueryFederationQueuePoliciesRequest request = + Records.newRecord(QueryFederationQueuePoliciesRequest.class); + request.setPageSize(pageSize); + request.setCurrentPage(currentPage); + request.setQueue(queue); + request.setQueues(queues); + return request; + } + + /** + * Sets the page size for FederationQueuePolicies pagination. + * + * @param pageSize The number of policies to display per page. + */ + @Private + @Unstable + public abstract void setPageSize(int pageSize); + + /** + * Retrieves the page size. + * + * @return The number of policies to display per page. + */ + @Public + @Unstable + public abstract int getPageSize(); + + /** + * Sets the current page in the FederationQueuePolicies pagination. + * + * @param currentPage The current page number. + */ + @Private + @Unstable + public abstract void setCurrentPage(int currentPage); + + /** + * Returns the current page number in the FederationQueuePolicies pagination. + * + * @return The current page number. + */ + @Public + @Unstable + public abstract int getCurrentPage(); + + /** + * Retrieves the queue. + * + * @return The name or identifier of the current queue. + */ + @Public + @Unstable + public abstract String getQueue(); + + /** + * Sets the queue to the specified value. + * + * We will use the fully qualified name matching for queues. + * For example, if the user inputs 'a', we will match + * queues that contain 'a' in their fully qualified names, + * such as 'root.a', 'root.b.a', and so on. + * + * @param queue queue name. + */ + @Private + @Unstable + public abstract void setQueue(String queue); + + /** + * Retrieves a list of queues. + * + * This part contains exact matches, + * which will match the queues contained in the list. + * + * @return A list of queue names or identifiers. + */ + @Public + @Unstable + public abstract List getQueues(); + + /** + * Sets the list of queues to the specified values. + * + * @param queues A list of queue names or identifiers to set. + */ + @Private + @Unstable + public abstract void setQueues(List queues); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesResponse.java new file mode 100644 index 0000000000..b42356230b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/QueryFederationQueuePoliciesResponse.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.List; + +/** + * This is the QueryFederationQueuePoliciesResponse, which contains the following information: + * 1. Number of policy information included, + * 2. total Page number, + * 3. pageSize Conditions passed by the user, + * 4. Result of queue weight information returned. + */ +@Private +@Unstable +public abstract class QueryFederationQueuePoliciesResponse { + + @Private + @Unstable + public static QueryFederationQueuePoliciesResponse newInstance( + int totalSize, int totalPage, int currentPage, int pageSize, + List federationQueueWeights) { + QueryFederationQueuePoliciesResponse response = + Records.newRecord(QueryFederationQueuePoliciesResponse.class); + response.setTotalSize(totalSize); + response.setTotalPage(totalPage); + response.setCurrentPage(currentPage); + response.setPageSize(pageSize); + response.setFederationQueueWeights(federationQueueWeights); + return response; + } + + @Private + @Unstable + public static QueryFederationQueuePoliciesResponse newInstance() { + QueryFederationQueuePoliciesResponse response = + Records.newRecord(QueryFederationQueuePoliciesResponse.class); + return response; + } + + /** + * Returns the total size of the query result. + * It is mainly related to the filter conditions set by the user. + * + * @return The total size of the query result. + */ + public abstract int getTotalSize(); + + /** + * Sets the total size of the federationQueueWeights. + * + * @param totalSize The total size of the query result to be set. + */ + public abstract void setTotalSize(int totalSize); + + /** + * Returns the page. + * + * @return page. + */ + @Public + @Unstable + public abstract int getTotalPage(); + + /** + * Sets the page. + * + * @param page page. + */ + @Private + @Unstable + public abstract void setTotalPage(int page); + + /** + * Returns the current page number in the FederationQueuePolicies pagination. + * + * @return The current page number. + */ + @Public + @Unstable + public abstract int getCurrentPage(); + + /** + * Sets the current page in the FederationQueuePolicies pagination. + * + * @param currentPage The current page number. + */ + @Private + @Unstable + public abstract void setCurrentPage(int currentPage); + + + /** + * Retrieves the page size. + * + * @return The number of policies to display per page. + */ + @Public + @Unstable + public abstract int getPageSize(); + + /** + * Sets the page size for FederationQueuePolicies pagination. + * + * @param pageSize The number of policies to display per page. + */ + @Private + @Unstable + public abstract void setPageSize(int pageSize); + + /** + * Get a list of FederationQueueWeight objects of different queues. + * + * @return list of FederationQueueWeight. + */ + @Public + @Unstable + public abstract List getFederationQueueWeights(); + + /** + * Sets the FederationQueueWeights, which represent the weights of different queues. + * + * @param federationQueueWeights list of FederationQueueWeight. + */ + @Private + @Unstable + public abstract void setFederationQueueWeights( + List federationQueueWeights); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto index aca7a4c0b8..fcae14128d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto @@ -50,4 +50,5 @@ service ResourceManagerAdministrationProtocolService { rpc deregisterSubCluster(DeregisterSubClusterRequestProto) returns (DeregisterSubClusterResponseProto); rpc saveFederationQueuePolicy(SaveFederationQueuePolicyRequestProto) returns (SaveFederationQueuePolicyResponseProto); rpc batchSaveFederationQueuePolicies(BatchSaveFederationQueuePoliciesRequestProto) returns (BatchSaveFederationQueuePoliciesResponseProto); + rpc listFederationQueuePolicies(QueryFederationQueuePoliciesRequestProto) returns (QueryFederationQueuePoliciesResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto index 06e11f913b..a2945f1eb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto @@ -188,6 +188,21 @@ message BatchSaveFederationQueuePoliciesResponseProto { required string message = 1; } +message QueryFederationQueuePoliciesRequestProto { + optional int32 pageSize = 1; + optional int32 currentPage = 2; + optional string queue = 3; + repeated string queues = 4; +} + +message QueryFederationQueuePoliciesResponseProto { + optional int32 totalSize = 1; + optional int32 totalPage = 2; + optional int32 currentPage = 3; + optional int32 pageSize = 4; + repeated FederationQueueWeightProto federationQueueWeights = 5; +} + ////////////////////////////////////////////////////////////////// ///////////// RM Failover related records //////////////////////// ////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java index 788ef8feb1..0aa02c8124 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RouterCLI.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -65,6 +67,7 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkHeadRoomAlphaValid; import static org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight.checkSubClusterQueueWeightRatioValid; @@ -85,7 +88,8 @@ public class RouterCLI extends Configured implements Tool { // Command2: policy .put("-policy", new UsageInfo( "[-s|--save [queue;router weight;amrm weight;headroomalpha]] " + - "[-bs|--batch-save [--format xml] [-f|--input-file fileName]]", + "[-bs|--batch-save [--format xml] [-f|--input-file fileName]]" + + "[-l|--list [--pageSize][--currentPage][--queue][--queues]]", "We provide a set of commands for Policy:" + " Include list policies, save policies, batch save policies. " + " (Note: The policy type will be directly read from the" + @@ -122,6 +126,12 @@ public class RouterCLI extends Configured implements Tool { private static final String OPTION_FORMAT = "format"; private static final String OPTION_FILE = "f"; private static final String OPTION_INPUT_FILE = "input-file"; + private static final String OPTION_L = "l"; + private static final String OPTION_LIST = "list"; + private static final String OPTION_PAGE_SIZE = "pageSize"; + private static final String OPTION_CURRENT_PAGE = "currentPage"; + private static final String OPTION_QUEUE = "queue"; + private static final String OPTION_QUEUES = "queues"; private static final String CMD_POLICY = "-policy"; private static final String FORMAT_XML = "xml"; @@ -134,6 +144,12 @@ public class RouterCLI extends Configured implements Tool { private static final String XML_TAG_QUEUE = "queue"; private static final String XML_TAG_NAME = "name"; + private static final String LIST_POLICIES_TITLE = + "Yarn Federation Queue Policies"; + // Columns information + private static final List LIST_POLICIES_HEADER = Arrays.asList( + "Queue Name", "AMRM Weight", "Router Weight"); + public RouterCLI() { super(); } @@ -191,7 +207,8 @@ public class RouterCLI extends Configured implements Tool { .append("routeradmin\n") .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]\n") .append(" [-policy [-s|--save [queue;router weight;amrm weight;headroomalpha] " + - "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]]\n") + "[-bs|--batch-save [--format xml,json] [-f|--input-file fileName]]] " + + "[-l|--list [--pageSize][--currentPage][--queue][--queues]]\n") .append(" [-help [cmd]]").append("\n"); StringBuilder helpBuilder = new StringBuilder(); System.out.println(summary); @@ -346,11 +363,26 @@ public class RouterCLI extends Configured implements Tool { Option fileOpt = new Option("f", "input-file", true, "The location of the input configuration file. "); formatOpt.setOptionalArg(true); - + Option listOpt = new Option(OPTION_L, OPTION_LIST, false, + "We can display the configured queue strategy according to the parameters."); + Option pageSizeOpt = new Option(null, "pageSize", true, + "The number of policies displayed per page."); + Option currentPageOpt = new Option(null, "currentPage", true, + "Since users may configure numerous policies, we will choose to display them in pages. " + + "This parameter represents the page number to be displayed."); + Option queueOpt = new Option(null, "queue", true, + "the queue we need to filter. example: root.a"); + Option queuesOpt = new Option(null, "queues", true, + "list of queues to filter. example: root.a,root.b,root.c"); opts.addOption(saveOpt); opts.addOption(batchSaveOpt); opts.addOption(formatOpt); opts.addOption(fileOpt); + opts.addOption(listOpt); + opts.addOption(pageSizeOpt); + opts.addOption(currentPageOpt); + opts.addOption(queueOpt); + opts.addOption(queuesOpt); // Parse command line arguments. CommandLine cliParser; @@ -396,6 +428,31 @@ public class RouterCLI extends Configured implements Tool { // Batch SavePolicies. return handBatchSavePolicies(format, filePath); + } else if(cliParser.hasOption(OPTION_L) || cliParser.hasOption(OPTION_LIST)) { + + int pageSize = 10; + if (cliParser.hasOption(OPTION_PAGE_SIZE)) { + pageSize = Integer.parseInt(cliParser.getOptionValue(OPTION_PAGE_SIZE)); + } + + int currentPage = 1; + if (cliParser.hasOption(OPTION_CURRENT_PAGE)) { + currentPage = Integer.parseInt(cliParser.getOptionValue(OPTION_CURRENT_PAGE)); + } + + String queue = null; + if (cliParser.hasOption(OPTION_QUEUE)) { + queue = cliParser.getOptionValue(OPTION_QUEUE); + } + + List queues = null; + if (cliParser.hasOption(OPTION_QUEUES)) { + String tmpQueues = cliParser.getOptionValue(OPTION_QUEUES); + queues = Arrays.stream(tmpQueues.split(",")).collect(Collectors.toList()); + } + + // List Policies. + return handListPolicies(pageSize, currentPage, queue, queues); } else { // printUsage printUsage(args[0]); @@ -618,6 +675,46 @@ public class RouterCLI extends Configured implements Tool { return StringUtils.join(amRmPolicyWeights, ","); } + /** + * Handles the list federation policies based on the specified parameters. + * + * @param pageSize Records displayed per page. + * @param currentPage The current page number. + * @param queue The name of the queue to be filtered. + * @param queues list of queues to filter. + * @return 0, success; 1, failed. + */ + protected int handListPolicies(int pageSize, int currentPage, String queue, List queues) { + LOG.info("List Federation Policies, pageSize = {}, currentPage = {}, queue = {}, queues = {}", + pageSize, currentPage, queue, queues); + try { + PrintWriter writer = new PrintWriter(new OutputStreamWriter( + System.out, Charset.forName(StandardCharsets.UTF_8.name()))); + QueryFederationQueuePoliciesRequest request = + QueryFederationQueuePoliciesRequest.newInstance(pageSize, currentPage, queue, queues); + ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); + QueryFederationQueuePoliciesResponse response = + adminProtocol.listFederationQueuePolicies(request); + System.out.println("TotalPage = " + response.getTotalPage()); + + FormattingCLIUtils formattingCLIUtils = new FormattingCLIUtils(LIST_POLICIES_TITLE) + .addHeaders(LIST_POLICIES_HEADER); + List federationQueueWeights = response.getFederationQueueWeights(); + federationQueueWeights.forEach(federationQueueWeight -> { + String queueName = federationQueueWeight.getQueue(); + String amrmWeight = federationQueueWeight.getAmrmWeight(); + String routerWeight = federationQueueWeight.getRouterWeight(); + formattingCLIUtils.addLine(queueName, amrmWeight, routerWeight); + }); + writer.print(formattingCLIUtils.render()); + writer.flush(); + return EXIT_SUCCESS; + } catch (YarnException | IOException e) { + LOG.error("handleSavePolicy error.", e); + return EXIT_ERROR; + } + } + @Override public int run(String[] args) throws Exception { YarnConfiguration yarnConf = getConf() == null ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java index 476ba75263..6ed83826df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRouterCLI.java @@ -29,6 +29,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.junit.Before; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -78,6 +80,19 @@ public class TestRouterCLI { return SaveFederationQueuePolicyResponse.newInstance("success"); }); + when(admin.listFederationQueuePolicies(any(QueryFederationQueuePoliciesRequest.class))) + .thenAnswer((Answer) invocationOnMock -> { + // Step1. parse request. + Object obj = invocationOnMock.getArgument(0); + QueryFederationQueuePoliciesRequest request = (QueryFederationQueuePoliciesRequest) obj; + String queue = request.getQueue(); + List weights = new ArrayList<>(); + FederationQueueWeight weight = FederationQueueWeight.newInstance( + "SC-1:0.8,SC-2:0.2", "SC-1:0.6,SC-2:0.4", "1", queue, "test"); + weights.add(weight); + return QueryFederationQueuePoliciesResponse.newInstance(1, 1, 1, 10, weights); + }); + Configuration config = new Configuration(); config.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); @@ -240,4 +255,15 @@ public class TestRouterCLI { assertEquals("SC-1:0.8,SC-2:0.2", queueWeight2.getAmrmWeight()); assertEquals("SC-1:0.6,SC-2:0.4", queueWeight2.getRouterWeight()); } + + @Test + public void testListPolicies() throws Exception { + PrintStream oldOutPrintStream = System.out; + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + System.setOut(new PrintStream(dataOut)); + oldOutPrintStream.println(dataOut); + + String[] args = {"-policy", "-l", "--queue", "root.a"}; + assertEquals(0, rmAdminCLI.run(args)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java index 7107f8014e..36aecf141a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Updat import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.DeregisterSubClusterRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.SaveFederationQueuePolicyRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.BatchSaveFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProto; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB; import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest; @@ -84,6 +85,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -118,6 +121,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -400,4 +405,18 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour return null; } } + + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + QueryFederationQueuePoliciesRequestProto requestProto = + ((QueryFederationQueuePoliciesRequestPBImpl) request).getProto(); + try { + return new QueryFederationQueuePoliciesResponsePBImpl( + proxy.listFederationQueuePolicies(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java index 01feef9d9f..0eab80b9ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGr import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodesToAttributesMappingRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.NodesToAttributesMappingResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto; @@ -83,6 +85,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.CheckForDecommissioningNodesRequestPBImpl; @@ -117,6 +121,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.SaveFederationQueuePolicyResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.BatchSaveFederationQueuePoliciesResponsePBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesRequestPBImpl; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.QueryFederationQueuePoliciesResponsePBImpl; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -422,4 +428,21 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou throw new ServiceException(e); } } + + @Override + public QueryFederationQueuePoliciesResponseProto listFederationQueuePolicies( + RpcController controller, QueryFederationQueuePoliciesRequestProto proto) + throws ServiceException { + QueryFederationQueuePoliciesRequest request = + new QueryFederationQueuePoliciesRequestPBImpl(proto); + try { + QueryFederationQueuePoliciesResponse response = + real.listFederationQueuePolicies(request); + return ((QueryFederationQueuePoliciesResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesRequestPBImpl.java new file mode 100644 index 0000000000..b56ec663ce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesRequestPBImpl.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesRequestProto; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; + +import java.util.ArrayList; +import java.util.List; + +public class QueryFederationQueuePoliciesRequestPBImpl + extends QueryFederationQueuePoliciesRequest { + + private QueryFederationQueuePoliciesRequestProto proto = + QueryFederationQueuePoliciesRequestProto.getDefaultInstance(); + private QueryFederationQueuePoliciesRequestProto.Builder builder = null; + private boolean viaProto = false; + private List queues = null; + + public QueryFederationQueuePoliciesRequestPBImpl() { + builder = QueryFederationQueuePoliciesRequestProto.newBuilder(); + } + + public QueryFederationQueuePoliciesRequestPBImpl( + QueryFederationQueuePoliciesRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public void setPageSize(int pageSize) { + maybeInitBuilder(); + Preconditions.checkNotNull(builder); + builder.setPageSize(pageSize); + } + + @Override + public int getPageSize() { + QueryFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasPageSize = p.hasPageSize(); + if (hasPageSize) { + return p.getPageSize(); + } + return 0; + } + + @Override + public void setCurrentPage(int currentPage) { + maybeInitBuilder(); + Preconditions.checkNotNull(builder); + builder.setCurrentPage(currentPage); + } + + @Override + public int getCurrentPage() { + QueryFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasCurrentPage = p.hasCurrentPage(); + if (hasCurrentPage) { + return p.getCurrentPage(); + } + return 0; + } + + @Override + public String getQueue() { + QueryFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + boolean hasQueue = p.hasQueue(); + if (hasQueue) { + return p.getQueue(); + } + return null; + } + + @Override + public void setQueue(String queue) { + maybeInitBuilder(); + if (queue == null) { + builder.clearQueue(); + return; + } + builder.setQueue(queue); + } + + @Override + public List getQueues() { + if (this.queues != null) { + return this.queues; + } + initQueues(); + return this.queues; + } + + @Override + public void setQueues(List pQueues) { + if (pQueues == null || pQueues.isEmpty()) { + maybeInitBuilder(); + if (this.queues != null) { + this.queues.clear(); + } + return; + } + if (this.queues == null) { + this.queues = new ArrayList<>(); + } + this.queues.clear(); + this.queues.addAll(pQueues); + } + + private void initQueues() { + if (this.queues != null) { + return; + } + QueryFederationQueuePoliciesRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getQueuesList(); + this.queues = new ArrayList<>(); + this.queues.addAll(list); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + public QueryFederationQueuePoliciesRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.queues != null) { + addQueuesToProto(); + } + } + + private void addQueuesToProto() { + maybeInitBuilder(); + builder.clearQueue(); + if (this.queues == null) { + return; + } + builder.addAllQueues(this.queues); + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = QueryFederationQueuePoliciesRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesResponsePBImpl.java new file mode 100644 index 0000000000..8cfae749d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/QueryFederationQueuePoliciesResponsePBImpl.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.thirdparty.protobuf.TextFormat; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.yarn.proto.YarnProtos.FederationQueueWeightProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.QueryFederationQueuePoliciesResponseProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.FederationQueueWeight; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; + +import java.util.ArrayList; +import java.util.List; + +public class QueryFederationQueuePoliciesResponsePBImpl + extends QueryFederationQueuePoliciesResponse { + + private QueryFederationQueuePoliciesResponseProto proto = + QueryFederationQueuePoliciesResponseProto.getDefaultInstance(); + private QueryFederationQueuePoliciesResponseProto.Builder builder = null; + private boolean viaProto = false; + private List federationQueueWeights = null; + + public QueryFederationQueuePoliciesResponsePBImpl() { + builder = QueryFederationQueuePoliciesResponseProto.newBuilder(); + } + + public QueryFederationQueuePoliciesResponsePBImpl( + QueryFederationQueuePoliciesResponseProto proto) { + this.proto = proto; + this.viaProto = true; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + if (this.federationQueueWeights != null) { + for (FederationQueueWeight federationQueueWeight : federationQueueWeights) { + FederationQueueWeightPBImpl federationQueueWeightPBImpl = + (FederationQueueWeightPBImpl) federationQueueWeight; + builder.addFederationQueueWeights(federationQueueWeightPBImpl.getProto()); + } + } + proto = builder.build(); + viaProto = true; + } + + public QueryFederationQueuePoliciesResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = QueryFederationQueuePoliciesResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getTotalSize() { + QueryFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasTotalSize = p.hasTotalSize(); + if (hasTotalSize) { + return p.getTotalSize(); + } + return 0; + } + + @Override + public void setTotalSize(int totalSize) { + maybeInitBuilder(); + Preconditions.checkNotNull(builder); + builder.setTotalSize(totalSize); + } + + @Override + public int getTotalPage() { + QueryFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasTotalPage = p.hasTotalPage(); + if (hasTotalPage) { + return p.getTotalPage(); + } + return 0; + } + + @Override + public void setTotalPage(int totalPage) { + maybeInitBuilder(); + Preconditions.checkNotNull(builder); + builder.setTotalPage(totalPage); + } + + @Override + public int getCurrentPage() { + QueryFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasCurrentPage = p.hasCurrentPage(); + if (hasCurrentPage) { + return p.getCurrentPage(); + } + return 0; + } + + @Override + public void setCurrentPage(int currentPage) { + maybeInitBuilder(); + Preconditions.checkNotNull(builder); + builder.setCurrentPage(currentPage); + } + + @Override + public int getPageSize() { + QueryFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + boolean hasPageSize = p.hasPageSize(); + if (hasPageSize) { + return p.getPageSize(); + } + return 0; + } + + @Override + public void setPageSize(int pageSize) { + Preconditions.checkNotNull(builder); + builder.setPageSize(pageSize); + } + + private void initFederationQueueWeightsMapping() { + if (this.federationQueueWeights != null) { + return; + } + + QueryFederationQueuePoliciesResponseProtoOrBuilder p = viaProto ? proto : builder; + List queryFederationQueuePoliciesProtoList = + p.getFederationQueueWeightsList(); + + List fqWeights = new ArrayList<>(); + if (queryFederationQueuePoliciesProtoList == null || + queryFederationQueuePoliciesProtoList.size() == 0) { + this.federationQueueWeights = fqWeights; + return; + } + + for (FederationQueueWeightProto federationQueueWeightProto : + queryFederationQueuePoliciesProtoList) { + fqWeights.add(new FederationQueueWeightPBImpl(federationQueueWeightProto)); + } + + this.federationQueueWeights = fqWeights; + } + + + @Override + public List getFederationQueueWeights() { + initFederationQueueWeightsMapping(); + return this.federationQueueWeights; + } + + @Override + public void setFederationQueueWeights(List pfederationQueueWeights) { + maybeInitBuilder(); + if (federationQueueWeights == null) { + federationQueueWeights = new ArrayList<>(); + } + if(pfederationQueueWeights == null) { + builder.clearFederationQueueWeights(); + return; + } + federationQueueWeights.clear(); + federationQueueWeights.addAll(pfederationQueueWeights); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 0fc5289843..ed66dadefb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -179,6 +179,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; /** @@ -980,6 +982,12 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, return null; } + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + return null; + } + @VisibleForTesting public HashMap> getApplicationContainerIdMap() { return applicationContainerIdMap; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 7f1c7b93e9..8f9e8caa53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -101,6 +101,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; @@ -1095,6 +1097,14 @@ public class AdminService extends CompositeService implements " Please call Router's batchSaveFederationQueuePolicies to set Policies."); } + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + throw new YarnException("It is not allowed to call the RM's " + + " listFederationQueuePolicies. " + + " Please call Router's listFederationQueuePolicies to list Policies."); + } + private void validateAttributesExists( List nodesToAttributes) throws IOException { NodeAttributesManager nodeAttributesManager = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java index 6503765ede..d0e4825fed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java @@ -153,6 +153,8 @@ public final class RouterMetrics { private MutableGaugeInt numSaveFederationQueuePolicyFailedRetrieved; @Metric("# of batchSaveFederationQueuePolicies failed to be retrieved") private MutableGaugeInt numBatchSaveFederationQueuePoliciesFailedRetrieved; + @Metric("# of listFederationQueuePolicies failed to be retrieved") + private MutableGaugeInt numListFederationQueuePoliciesFailedRetrieved; @Metric("# of refreshAdminAcls failed to be retrieved") private MutableGaugeInt numRefreshAdminAclsFailedRetrieved; @Metric("# of refreshServiceAcls failed to be retrieved") @@ -303,6 +305,8 @@ public final class RouterMetrics { private MutableRate totalSucceededSaveFederationQueuePolicyRetrieved; @Metric("Total number of successful Retrieved BatchSaveFederationQueuePolicies and latency(ms)") private MutableRate totalSucceededBatchSaveFederationQueuePoliciesRetrieved; + @Metric("Total number of successful Retrieved ListFederationQueuePolicies and latency(ms)") + private MutableRate totalSucceededListFederationQueuePoliciesFailedRetrieved; @Metric("Total number of successful Retrieved RefreshAdminAcls and latency(ms)") private MutableRate totalSucceededRefreshAdminAclsRetrieved; @Metric("Total number of successful Retrieved RefreshServiceAcls and latency(ms)") @@ -391,6 +395,7 @@ public final class RouterMetrics { private MutableQuantiles refreshDeregisterSubClusterLatency; private MutableQuantiles saveFederationQueuePolicyLatency; private MutableQuantiles batchSaveFederationQueuePoliciesLatency; + private MutableQuantiles listFederationQueuePoliciesLatency; private MutableQuantiles refreshAdminAclsLatency; private MutableQuantiles refreshServiceAclsLatency; private MutableQuantiles replaceLabelsOnNodesLatency; @@ -609,6 +614,10 @@ public final class RouterMetrics { "batchSaveFederationQueuePoliciesLatency", "latency of batch save federationqueuepolicies timeouts", "ops", "latency", 10); + listFederationQueuePoliciesLatency = registry.newQuantiles( + "listFederationQueuePoliciesLatency", + "latency of list federationqueuepolicies timeouts", "ops", "latency", 10); + refreshAdminAclsLatency = registry.newQuantiles("refreshAdminAclsLatency", "latency of refresh admin acls timeouts", "ops", "latency", 10); @@ -948,6 +957,11 @@ public final class RouterMetrics { return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().numSamples(); } + @VisibleForTesting + public long getNumSucceededListFederationQueuePoliciesFailedRetrieved() { + return totalSucceededListFederationQueuePoliciesFailedRetrieved.lastStat().numSamples(); + } + @VisibleForTesting public long getNumSucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().numSamples(); @@ -1303,6 +1317,11 @@ public final class RouterMetrics { return totalSucceededBatchSaveFederationQueuePoliciesRetrieved.lastStat().mean(); } + @VisibleForTesting + public double getLatencySucceededListFederationQueuePoliciesRetrieved() { + return totalSucceededListFederationQueuePoliciesFailedRetrieved.lastStat().mean(); + } + @VisibleForTesting public double getLatencySucceededRefreshAdminAclsRetrieved() { return totalSucceededRefreshAdminAclsRetrieved.lastStat().mean(); @@ -1606,6 +1625,10 @@ public final class RouterMetrics { return numBatchSaveFederationQueuePoliciesFailedRetrieved.value(); } + public int getListFederationQueuePoliciesFailedRetrieved() { + return numListFederationQueuePoliciesFailedRetrieved.value(); + } + public int getNumRefreshAdminAclsFailedRetrieved() { return numRefreshAdminAclsFailedRetrieved.value(); } @@ -1968,6 +1991,11 @@ public final class RouterMetrics { batchSaveFederationQueuePoliciesLatency.add(duration); } + public void succeededListFederationQueuePoliciesRetrieved(long duration) { + totalSucceededListFederationQueuePoliciesFailedRetrieved.add(duration); + listFederationQueuePoliciesLatency.add(duration); + } + public void succeededRefreshAdminAclsRetrieved(long duration) { totalSucceededRefreshAdminAclsRetrieved.add(duration); refreshAdminAclsLatency.add(duration); @@ -2254,6 +2282,10 @@ public final class RouterMetrics { numBatchSaveFederationQueuePoliciesFailedRetrieved.incr(); } + public void incrListFederationQueuePoliciesFailedRetrieved() { + numListFederationQueuePoliciesFailedRetrieved.incr(); + } + public void incrRefreshAdminAclsFailedRetrieved() { numRefreshAdminAclsFailedRetrieved.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java index ed0698b1a8..23517b97b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java @@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,4 +227,10 @@ public class DefaultRMAdminRequestInterceptor BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { return rmAdminProxy.batchSaveFederationQueuePolicies(request); } + + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + return rmAdminProxy.listFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index a80e31f67d..268522c1b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.router.rmadmin; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -68,6 +69,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -84,6 +87,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.ArrayList; import java.util.Map; @@ -995,6 +999,213 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep throw new YarnException("Unable to batchSaveFederationQueuePolicies."); } + /** + * List the Queue Policies for the Federation. + * + * @param request QueryFederationQueuePolicies Request. + * @return QueryFederationQueuePolicies Response. + * + * @throws YarnException indicates exceptions from yarn servers. + * @throws IOException io error occurs. + */ + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + + // Parameter validation. + if (request == null) { + routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "Missing ListFederationQueuePolicies Request.", null); + } + + if (request.getPageSize() <= 0) { + routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "PageSize cannot be negative or zero.", null); + } + + if (request.getCurrentPage() <= 0) { + routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException( + "CurrentPage cannot be negative or zero.", null); + } + + try { + QueryFederationQueuePoliciesResponse response = null; + + long startTime = clock.getTime(); + String queue = request.getQueue(); + List queues = request.getQueues(); + int currentPage = request.getCurrentPage(); + int pageSize = request.getPageSize(); + + // Print log + LOG.info("queue = {}, queues={}, currentPage={}, pageSize={}", + queue, queues, currentPage, pageSize); + + Map policiesConfigurations = + federationFacade.getPoliciesConfigurations(); + + // If the queue is not empty, filter according to the queue. + if (StringUtils.isNotBlank(queue)) { + response = filterPoliciesConfigurationsByQueue(queue, policiesConfigurations, + pageSize, currentPage); + } else if(CollectionUtils.isNotEmpty(queues)) { + // If queues are not empty, filter by queues, which may return multiple results. + // We filter by pagination. + response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations, + pageSize, currentPage); + } + long stopTime = clock.getTime(); + routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime); + return response; + } catch (Exception e) { + routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); + RouterServerUtil.logAndThrowException(e, + "Unable to ListFederationQueuePolicies due to exception. " + e.getMessage()); + } + + routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); + throw new YarnException("Unable to listFederationQueuePolicies."); + } + + /** + * According to the configuration information of the queue filtering queue, + * this part should only return 1 result. + * + * @param queue queueName. + * @param policiesConfigurations policy configurations. + * @param pageSize Items per page. + * @param currentPage The number of pages to be queried. + * @return federation queue policies response. + * @throws YarnException indicates exceptions from yarn servers. + * + */ + private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueue(String queue, + Map policiesConfigurations, + int pageSize, int currentPage) throws YarnException { + + // Step1. Check the parameters, if the policy list is empty, return empty directly. + if (MapUtils.isEmpty(policiesConfigurations)) { + return null; + } + SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null); + if(policyConf == null) { + return null; + } + + // Step2. Parse the parameters. + List federationQueueWeights = new ArrayList<>(); + FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf); + federationQueueWeights.add(federationQueueWeight); + + // Step3. Return result. + return QueryFederationQueuePoliciesResponse.newInstance( + 1, 1, currentPage, pageSize, federationQueueWeights); + } + + /** + * Filter queue configuration information based on the queue list. + * + * @param queues The name of the queue. + * @param policiesConfigurations policy configurations. + * @param pageSize Items per page. + * @param currentPage The number of pages to be queried. + * @return federation queue policies response. + * @throws YarnException indicates exceptions from yarn servers. + */ + private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueues( + List queues, Map policiesConfigurations, + int pageSize, int currentPage) throws YarnException { + + // Step1. Check the parameters, if the policy list is empty, return empty directly. + if (MapUtils.isEmpty(policiesConfigurations)) { + return null; + } + + // Step2. Filtering for Queue Policies. + List federationQueueWeights = new ArrayList<>(); + for (String queue : queues) { + SubClusterPolicyConfiguration policyConf = policiesConfigurations.getOrDefault(queue, null); + if(policyConf == null) { + continue; + } + FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf); + if (federationQueueWeight != null) { + federationQueueWeights.add(federationQueueWeight); + } + } + + int startIndex = (currentPage - 1) * pageSize; + int endIndex = Math.min(startIndex + pageSize, federationQueueWeights.size()); + List subFederationQueueWeights = + federationQueueWeights.subList(startIndex, endIndex); + + int totalSize = federationQueueWeights.size(); + int totalPage = + (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1; + + // Step3. Returns the Queue Policies result. + return QueryFederationQueuePoliciesResponse.newInstance( + totalSize, totalPage, currentPage, pageSize, subFederationQueueWeights); + } + + /** + * Parses a FederationQueueWeight from the given queue and SubClusterPolicyConfiguration. + * + * @param queue The name of the queue. + * @param policyConf policy configuration. + * @return Queue weights for representing Federation. + * @throws YarnException YarnException indicates exceptions from yarn servers. + */ + private FederationQueueWeight parseFederationQueueWeight(String queue, + SubClusterPolicyConfiguration policyConf) throws YarnException { + + if (policyConf != null) { + ByteBuffer params = policyConf.getParams(); + WeightedPolicyInfo weightedPolicyInfo = WeightedPolicyInfo.fromByteBuffer(params); + Map amrmPolicyWeights = weightedPolicyInfo.getAMRMPolicyWeights(); + Map routerPolicyWeights = + weightedPolicyInfo.getRouterPolicyWeights(); + float headroomAlpha = weightedPolicyInfo.getHeadroomAlpha(); + String policyManagerClassName = policyConf.getType(); + + String amrmPolicyWeight = parsePolicyWeights(amrmPolicyWeights); + String routerPolicyWeight = parsePolicyWeights(routerPolicyWeights); + + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(amrmPolicyWeight); + FederationQueueWeight.checkSubClusterQueueWeightRatioValid(routerPolicyWeight); + + return FederationQueueWeight.newInstance(routerPolicyWeight, amrmPolicyWeight, + String.valueOf(headroomAlpha), queue, policyManagerClassName); + } + + return null; + } + + /** + * Parses the policy weights from the provided policyWeights map. + * returns a string similar to the following: + * SC-1:0.7,SC-2:0.3 + * + * @param policyWeights + * A map containing SubClusterIdInfo as keys and corresponding weight values. + * @return A string representation of the parsed policy weights. + */ + protected String parsePolicyWeights(Map policyWeights) { + if (MapUtils.isEmpty(policyWeights)) { + return null; + } + List policyWeightList = new ArrayList<>(); + for (Map.Entry entry : policyWeights.entrySet()) { + SubClusterIdInfo key = entry.getKey(); + Float value = entry.getValue(); + policyWeightList.add(key.toId() + ":" + value); + } + return StringUtils.join(policyWeightList, ","); + } + /** * Save FederationQueuePolicy. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java index 3b760e28f7..fc2278d3bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java @@ -69,6 +69,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.router.RouterServerUtil; import org.apache.hadoop.yarn.server.router.security.authorize.RouterPolicyProvider; import org.apache.hadoop.yarn.util.LRUCacheHashMap; @@ -410,4 +412,11 @@ public class RouterRMAdminService extends AbstractService RequestInterceptorChainWrapper pipeline = getInterceptorChain(); return pipeline.getRootInterceptor().batchSaveFederationQueuePolicies(request); } + + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + RequestInterceptorChainWrapper pipeline = getInterceptorChain(); + return pipeline.getRootInterceptor().listFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java index f62ddb2796..60d5e5303c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java @@ -633,6 +633,11 @@ public class TestRouterMetrics { LOG.info("Mocked: failed BatchSaveFederationQueuePolicies call"); metrics.incrBatchSaveFederationQueuePoliciesFailedRetrieved(); } + + public void getListFederationQueuePoliciesFailedRetrieved() { + LOG.info("Mocked: failed ListFederationQueuePolicies call"); + metrics.incrListFederationQueuePoliciesFailedRetrieved(); + } } // Records successes for all calls @@ -974,6 +979,12 @@ public class TestRouterMetrics { " call with duration {}", duration); metrics.succeededBatchSaveFederationQueuePoliciesRetrieved(duration); } + + public void getListFederationQueuePoliciesRetrieved(long duration) { + LOG.info("Mocked: successful ListFederationQueuePoliciesRetrieved " + + " call with duration {}", duration); + metrics.succeededListFederationQueuePoliciesRetrieved(duration); + } } @Test @@ -2277,4 +2288,27 @@ public class TestRouterMetrics { metrics.getLatencySucceededBatchSaveFederationQueuePoliciesRetrieved(), ASSERT_DOUBLE_DELTA); } + + @Test + public void testListFederationQueuePoliciesFailedRetrieved() { + long totalBadBefore = metrics.getListFederationQueuePoliciesFailedRetrieved(); + badSubCluster.getListFederationQueuePoliciesFailedRetrieved(); + Assert.assertEquals(totalBadBefore + 1, + metrics.getListFederationQueuePoliciesFailedRetrieved()); + } + + @Test + public void testListFederationQueuePoliciesRetrieved() { + long totalGoodBefore = metrics.getNumSucceededListFederationQueuePoliciesFailedRetrieved(); + goodSubCluster.getListFederationQueuePoliciesRetrieved(150); + Assert.assertEquals(totalGoodBefore + 1, + metrics.getNumSucceededListFederationQueuePoliciesFailedRetrieved()); + Assert.assertEquals(150, + metrics.getLatencySucceededListFederationQueuePoliciesRetrieved(), ASSERT_DOUBLE_DELTA); + goodSubCluster.getListFederationQueuePoliciesRetrieved(300); + Assert.assertEquals(totalGoodBefore + 2, + metrics.getNumSucceededListFederationQueuePoliciesFailedRetrieved()); + Assert.assertEquals(225, + metrics.getLatencySucceededListFederationQueuePoliciesRetrieved(), ASSERT_DOUBLE_DELTA); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java index 263926e728..6fe218c7b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java @@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; /** * Mock interceptor that does not do anything other than forwarding it to the @@ -177,4 +179,10 @@ public class PassThroughRMAdminRequestInterceptor BatchSaveFederationQueuePoliciesRequest request) throws YarnException, IOException { return getNextInterceptor().batchSaveFederationQueuePolicies(request); } + + @Override + public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( + QueryFederationQueuePoliciesRequest request) throws YarnException, IOException { + return getNextInterceptor().listFederationQueuePolicies(request); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 20266d5b54..8439b01222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -62,6 +62,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePoli import org.apache.hadoop.yarn.server.api.protocolrecords.SaveFederationQueuePolicyResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.BatchSaveFederationQueuePoliciesResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.QueryFederationQueuePoliciesResponse; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.manager.WeightedLocalityPolicyManager; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; @@ -82,6 +84,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.HashSet; @@ -802,10 +805,10 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { * Generating Policy Weight Data. * * @param pSubClusters set of sub-clusters. - * @return policy Weight String, like SC-1:0.7,SC-2:0. + * @return policy Weight String, like SC-1:0.7,SC-2:0.3 */ private String generatePolicyWeight(List pSubClusters) { - List weights = generateWeights(subClusters.size()); + List weights = generateWeights(pSubClusters.size()); List subClusterWeight = new ArrayList<>(); for (int i = 0; i < pSubClusters.size(); i++) { String subCluster = pSubClusters.get(i); @@ -844,4 +847,123 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest { return formattedRandomNumbers; } + + @Test + public void testParsePolicyWeights() { + Map policyWeights = new LinkedHashMap<>(); + SubClusterIdInfo sc1 = new SubClusterIdInfo("SC-1"); + policyWeights.put(sc1, 0.7f); + SubClusterIdInfo sc2 = new SubClusterIdInfo("SC-2"); + policyWeights.put(sc2, 0.3f); + String policyWeight = interceptor.parsePolicyWeights(policyWeights); + assertEquals("SC-1:0.7,SC-2:0.3", policyWeight); + } + + @Test + public void testFilterPoliciesConfigurationsByQueues() throws Exception { + // SubClusters : SC-1,SC-2 + List subClusterLists = new ArrayList<>(); + subClusterLists.add("SC-1"); + subClusterLists.add("SC-2"); + + // We initialize 26 queues, queue root.a~root.z + List federationQueueWeights = new ArrayList<>(); + for (char letter = 'a'; letter <= 'z'; letter++) { + FederationQueueWeight leaf = + generateFederationQueueWeight("root." + letter, subClusterLists); + federationQueueWeights.add(leaf); + } + + // Save Queue Policies in Batches + BatchSaveFederationQueuePoliciesRequest request = + BatchSaveFederationQueuePoliciesRequest.newInstance(federationQueueWeights); + + BatchSaveFederationQueuePoliciesResponse policiesResponse = + interceptor.batchSaveFederationQueuePolicies(request); + + assertNotNull(policiesResponse); + assertNotNull(policiesResponse.getMessage()); + assertEquals("batch save policies success.", policiesResponse.getMessage()); + + // We query 12 queues, root.a ~ root.l + List queues = new ArrayList<>(); + for (char letter = 'a'; letter <= 'l'; letter++) { + queues.add("root." + letter); + } + + // Queue1: We query page 1, 10 items per page, and the returned result should be 10 items. + // TotalPage should be 2, TotalSize should be 12. + QueryFederationQueuePoliciesRequest request1 = + QueryFederationQueuePoliciesRequest.newInstance(10, 1, "", queues); + QueryFederationQueuePoliciesResponse response1 = + interceptor.listFederationQueuePolicies(request1); + assertNotNull(response1); + assertEquals(1, response1.getCurrentPage()); + assertEquals(10, response1.getPageSize()); + assertEquals(2, response1.getTotalPage()); + assertEquals(12, response1.getTotalSize()); + List federationQueueWeights1 = response1.getFederationQueueWeights(); + assertNotNull(federationQueueWeights1); + assertEquals(10, federationQueueWeights1.size()); + + // Queue2: We query page 1, 12 items per page, and the returned result should be 12 items. + // TotalPage should be 1, TotalSize should be 12. + QueryFederationQueuePoliciesRequest request2 = + QueryFederationQueuePoliciesRequest.newInstance(12, 1, "", queues); + QueryFederationQueuePoliciesResponse response2 = + interceptor.listFederationQueuePolicies(request2); + assertNotNull(response2); + assertEquals(1, response2.getCurrentPage()); + assertEquals(12, response2.getPageSize()); + assertEquals(1, response2.getTotalPage()); + assertEquals(12, response2.getTotalSize()); + List federationQueueWeights2 = response2.getFederationQueueWeights(); + assertNotNull(federationQueueWeights2); + assertEquals(12, federationQueueWeights2.size()); + + // Queue3: Boundary limit exceeded + // We filter 12 queues, should return 12 records, 12 per page, + // should return 1 page, but we are going to return page 2. + QueryFederationQueuePoliciesRequest request3 = + QueryFederationQueuePoliciesRequest.newInstance(12, 2, "", queues); + QueryFederationQueuePoliciesResponse response3 = + interceptor.listFederationQueuePolicies(request3); + assertNotNull(response3); + assertEquals(2, response3.getCurrentPage()); + assertEquals(12, response3.getPageSize()); + assertEquals(1, response2.getTotalPage()); + assertEquals(12, response3.getTotalSize()); + List federationQueueWeights3 = response3.getFederationQueueWeights(); + assertNotNull(federationQueueWeights3); + assertEquals(0, federationQueueWeights3.size()); + + // Queue4: Boundary limit exceeded + // We pass in some negative parameters and we will get some exceptions + QueryFederationQueuePoliciesRequest request4 = + QueryFederationQueuePoliciesRequest.newInstance(-1, 2, "", queues); + LambdaTestUtils.intercept(YarnException.class, "PageSize cannot be negative or zero.", + () -> interceptor.listFederationQueuePolicies(request4)); + + // Queue5: Boundary limit exceeded + // We pass in some negative parameters and we will get some exceptions + QueryFederationQueuePoliciesRequest request5 = + QueryFederationQueuePoliciesRequest.newInstance(10, -1, "", queues); + LambdaTestUtils.intercept(YarnException.class, "CurrentPage cannot be negative or zero.", + () -> interceptor.listFederationQueuePolicies(request5)); + + // Queue6: We use Queue as the condition, + // at this time we will only get the only one return value. + QueryFederationQueuePoliciesRequest request6 = + QueryFederationQueuePoliciesRequest.newInstance(10, 1, "root.a", null); + QueryFederationQueuePoliciesResponse response6 = + interceptor.listFederationQueuePolicies(request6); + assertNotNull(response6); + assertEquals(1, response6.getCurrentPage()); + assertEquals(10, response6.getPageSize()); + assertEquals(1, response6.getTotalPage()); + assertEquals(1, response6.getTotalSize()); + List federationQueueWeights6 = response6.getFederationQueueWeights(); + assertNotNull(federationQueueWeights6); + assertEquals(1, federationQueueWeights6.size()); + } }