diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java index 465f1ad7c3..dd596aad56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.MissingArgumentException; import org.apache.commons.cli.Option; @@ -85,6 +86,7 @@ import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import static org.apache.hadoop.yarn.client.util.YarnClientUtils.NO_LABEL_ERR_MSG; +import static org.apache.hadoop.yarn.client.util.YarnClientUtils.isYarnFederationEnabled; @Private @Unstable @@ -99,7 +101,9 @@ public class RMAdminCLI extends HAAdmin { "Invalid timeout specified : "; private static final Pattern RESOURCE_TYPES_ARGS_PATTERN = Pattern.compile("^[0-9]*$"); - + private static final String SUBCLUSTERID = "subClusterId"; + private static final Option OPTION_SUBCLUSTERID = new Option(SUBCLUSTERID, true, + "We support setting subClusterId in YARN Federation mode to specify specific subClusters."); protected final static Map ADMIN_USAGE = ImmutableMap.builder() .put("-refreshQueues", new UsageInfo("", @@ -338,37 +342,50 @@ public class RMAdminCLI extends HAAdmin { ResourceManagerAdministrationProtocol.class); } - private int refreshQueues() throws IOException, YarnException { + private int refreshQueues(String subClusterId) throws IOException, YarnException { // Refresh the queue properties ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshQueuesRequest request = recordFactory.newRecordInstance(RefreshQueuesRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshQueues(request); return 0; } - private int refreshNodes(boolean graceful) throws IOException, YarnException { + private int refreshNodes(boolean graceful, String subClusterId) + throws IOException, YarnException { // Refresh the nodes ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest request = RefreshNodesRequest.newInstance( graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL); + if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshNodes(request); return 0; } - private int refreshNodes(int timeout, String trackingMode) + private int refreshNodes(int timeout, String trackingMode, String subClusterId) throws IOException, YarnException { boolean serverTracking = !"client".equals(trackingMode); // Graceful decommissioning with timeout ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesRequest gracefulRequest = RefreshNodesRequest .newInstance(DecommissionType.GRACEFUL, timeout); + if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) { + gracefulRequest.setSubClusterId(subClusterId); + } adminProtocol.refreshNodes(gracefulRequest); if (serverTracking) { return 0; } CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory .newRecordInstance(CheckForDecommissioningNodesRequest.class); + if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) { + checkForDecommissioningNodesRequest.setSubClusterId(subClusterId); + } long waitingTime; boolean nodesDecommissioning = true; // As RM enforces timeout automatically, client usually don't need @@ -408,6 +425,9 @@ public class RMAdminCLI extends HAAdmin { + " seconds, issuing forceful decommissioning command."); RefreshNodesRequest forcefulRequest = RefreshNodesRequest .newInstance(DecommissionType.FORCEFUL); + if (isYarnFederationEnabled(getConf()) && StringUtils.isNotBlank(subClusterId)) { + forcefulRequest.setSubClusterId(subClusterId); + } adminProtocol.refreshNodes(forcefulRequest); } else { System.out.println("Graceful decommissioning completed in " + waitingTime @@ -416,79 +436,100 @@ public class RMAdminCLI extends HAAdmin { return 0; } - private int refreshNodesResources() throws IOException, YarnException { + private int refreshNodesResources(String subClusterId) + throws IOException, YarnException { // Refresh the resources at the Nodemanager ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshNodesResourcesRequest request = - recordFactory.newRecordInstance(RefreshNodesResourcesRequest.class); + recordFactory.newRecordInstance(RefreshNodesResourcesRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshNodesResources(request); return 0; } - private int refreshNodes() throws IOException, YarnException { - return refreshNodes(false); + private int refreshNodes(String subClusterId) throws IOException, YarnException { + return refreshNodes(false, subClusterId); } - private int refreshUserToGroupsMappings() throws IOException, + private int refreshUserToGroupsMappings(String subClusterId) throws IOException, YarnException { // Refresh the user-to-groups mappings ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshUserToGroupsMappingsRequest request = recordFactory.newRecordInstance(RefreshUserToGroupsMappingsRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshUserToGroupsMappings(request); return 0; } - private int refreshSuperUserGroupsConfiguration() throws IOException, + private int refreshSuperUserGroupsConfiguration(String subClusterId) throws IOException, YarnException { // Refresh the super-user groups ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshSuperUserGroupsConfigurationRequest request = recordFactory.newRecordInstance(RefreshSuperUserGroupsConfigurationRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshSuperUserGroupsConfiguration(request); return 0; } - private int refreshAdminAcls() throws IOException, YarnException { + private int refreshAdminAcls(String subClusterId) throws IOException, YarnException { // Refresh the admin acls ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshAdminAclsRequest request = recordFactory.newRecordInstance(RefreshAdminAclsRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshAdminAcls(request); return 0; } - private int refreshServiceAcls() throws IOException, YarnException { + private int refreshServiceAcls(String subClusterId) throws IOException, YarnException { // Refresh the service acls ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshServiceAclsRequest request = recordFactory.newRecordInstance(RefreshServiceAclsRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshServiceAcls(request); return 0; } - private int refreshClusterMaxPriority() throws IOException, YarnException { + private int refreshClusterMaxPriority(String subClusterId) throws IOException, YarnException { // Refresh cluster max priority ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); RefreshClusterMaxPriorityRequest request = recordFactory.newRecordInstance(RefreshClusterMaxPriorityRequest.class); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.refreshClusterMaxPriority(request); return 0; } private int updateNodeResource(String nodeIdStr, Resource resource, - int overCommitTimeout) throws YarnException, IOException { + int overCommitTimeout, String subClusterId) throws YarnException, IOException { ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); UpdateNodeResourceRequest request = recordFactory.newRecordInstance(UpdateNodeResourceRequest.class); NodeId nodeId = NodeId.fromString(nodeIdStr); - Map resourceMap = - new HashMap(); + Map resourceMap = new HashMap<>(); resourceMap.put( nodeId, ResourceOption.newInstance(resource, overCommitTimeout)); request.setNodeResourceMap(resourceMap); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.updateNodeResource(request); return 0; } @@ -551,8 +592,10 @@ public class RMAdminCLI extends HAAdmin { "Add to cluster node labels."); opts.addOption("directlyAccessNodeLabelStore", false, "Directly access node label store."); + opts.addOption(OPTION_SUBCLUSTERID); + int exitCode = -1; - CommandLine cliParser = null; + CommandLine cliParser; try { cliParser = new GnuParser().parse(opts, args); } catch (MissingArgumentException ex) { @@ -561,6 +604,7 @@ public class RMAdminCLI extends HAAdmin { return exitCode; } + String subClusterId = parseSubClusterId(cliParser); List labels = YarnClientUtils.buildNodeLabelsFromStr( cliParser.getOptionValue("addToClusterNodeLabels")); if (cliParser.hasOption("directlyAccessNodeLabelStore")) { @@ -570,6 +614,9 @@ public class RMAdminCLI extends HAAdmin { createAdminProtocol(); AddToClusterNodeLabelsRequest request = AddToClusterNodeLabelsRequest.newInstance(labels); + if (StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.addToClusterNodeLabels(request); } return 0; @@ -582,8 +629,9 @@ public class RMAdminCLI extends HAAdmin { "Remove From cluster node labels."); opts.addOption("directlyAccessNodeLabelStore", false, "Directly access node label store."); + opts.addOption(OPTION_SUBCLUSTERID); int exitCode = -1; - CommandLine cliParser = null; + CommandLine cliParser; try { cliParser = new GnuParser().parse(opts, args); } catch (MissingArgumentException ex) { @@ -592,6 +640,7 @@ public class RMAdminCLI extends HAAdmin { return exitCode; } + String subClusterId = parseSubClusterId(cliParser); Set labels = buildNodeLabelNamesFromStr( cliParser.getOptionValue("removeFromClusterNodeLabels")); if (cliParser.hasOption("directlyAccessNodeLabelStore")) { @@ -602,6 +651,9 @@ public class RMAdminCLI extends HAAdmin { createAdminProtocol(); RemoveFromClusterNodeLabelsRequest request = RemoveFromClusterNodeLabelsRequest.newInstance(labels); + if(StringUtils.isNotBlank(subClusterId)) { + request.setSubClusterId(subClusterId); + } adminProtocol.removeFromClusterNodeLabels(request); } @@ -666,6 +718,7 @@ public class RMAdminCLI extends HAAdmin { "Fail on unknown nodes."); opts.addOption("directlyAccessNodeLabelStore", false, "Directly access node label store."); + opts.addOption(OPTION_SUBCLUSTERID); int exitCode = -1; CommandLine cliParser = null; try { @@ -678,13 +731,15 @@ public class RMAdminCLI extends HAAdmin { Map> map = buildNodeLabelsMapFromStr( cliParser.getOptionValue("replaceLabelsOnNode")); + String subClusterId = parseSubClusterId(cliParser); return replaceLabelsOnNodes(map, cliParser.hasOption("failOnUnknownNodes"), - cliParser.hasOption("directlyAccessNodeLabelStore")); + cliParser.hasOption("directlyAccessNodeLabelStore"), subClusterId); } private int replaceLabelsOnNodes(Map> map, - boolean failOnUnknownNodes, boolean directlyAccessNodeLabelStore) + boolean failOnUnknownNodes, boolean directlyAccessNodeLabelStore, + String subClusterId) throws IOException, YarnException { if (directlyAccessNodeLabelStore) { getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map); @@ -694,6 +749,9 @@ public class RMAdminCLI extends HAAdmin { ReplaceLabelsOnNodeRequest request = ReplaceLabelsOnNodeRequest.newInstance(map); request.setFailOnUnknownNodes(failOnUnknownNodes); + if (StringUtils.isNotBlank(subClusterId) && isYarnFederationEnabled(getConf())) { + request.setSubClusterId(subClusterId); + } adminProtocol.replaceLabelsOnNode(request); } return 0; @@ -739,39 +797,54 @@ public class RMAdminCLI extends HAAdmin { // // verify that we have enough command line parameters // + String subClusterId = StringUtils.EMPTY; if ("-refreshAdminAcls".equals(cmd) || "-refreshQueues".equals(cmd) || "-refreshNodesResources".equals(cmd) || "-refreshServiceAcl".equals(cmd) || "-refreshUserToGroupsMappings".equals(cmd) || - "-refreshSuperUserGroupsConfiguration".equals(cmd)) { - if (args.length != 1) { + "-refreshSuperUserGroupsConfiguration".equals(cmd) || + "-refreshClusterMaxPriority".equals(cmd)) { + subClusterId = parseSubClusterId(args, isHAEnabled); + // If we enable Federation mode, the number of args may be either one or three. + // Example: -refreshQueues or -refreshQueues -subClusterId SC-1 + if (isYarnFederationEnabled(getConf()) && args.length != 1 && args.length != 3) { + printUsage(cmd, isHAEnabled); + return exitCode; + } else if (!isYarnFederationEnabled(getConf()) && args.length != 1) { + // If Federation mode is not enabled, then the number of args can only be one. + // Example: -refreshQueues printUsage(cmd, isHAEnabled); return exitCode; } } + // If it is federation mode, we will print federation mode information + if (isYarnFederationEnabled(getConf())) { + System.out.println("Using YARN Federation mode."); + } + try { if ("-refreshQueues".equals(cmd)) { - exitCode = refreshQueues(); + exitCode = refreshQueues(subClusterId); } else if ("-refreshNodes".equals(cmd)) { exitCode = handleRefreshNodes(args, cmd, isHAEnabled); } else if ("-refreshNodesResources".equals(cmd)) { - exitCode = refreshNodesResources(); + exitCode = refreshNodesResources(subClusterId); } else if ("-refreshUserToGroupsMappings".equals(cmd)) { - exitCode = refreshUserToGroupsMappings(); + exitCode = refreshUserToGroupsMappings(subClusterId); } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) { - exitCode = refreshSuperUserGroupsConfiguration(); + exitCode = refreshSuperUserGroupsConfiguration(subClusterId); } else if ("-refreshAdminAcls".equals(cmd)) { - exitCode = refreshAdminAcls(); + exitCode = refreshAdminAcls(subClusterId); } else if ("-refreshServiceAcl".equals(cmd)) { - exitCode = refreshServiceAcls(); + exitCode = refreshServiceAcls(subClusterId); } else if ("-refreshClusterMaxPriority".equals(cmd)) { - exitCode = refreshClusterMaxPriority(); + exitCode = refreshClusterMaxPriority(subClusterId); } else if ("-getGroups".equals(cmd)) { String[] usernames = Arrays.copyOfRange(args, i, args.length); exitCode = getGroups(usernames); } else if ("-updateNodeResource".equals(cmd)) { - exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled); + exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled, subClusterId); } else if ("-addToClusterNodeLabels".equals(cmd)) { exitCode = handleAddToClusterNodeLabels(args, cmd, isHAEnabled); } else if ("-removeFromClusterNodeLabels".equals(cmd)) { @@ -827,6 +900,7 @@ public class RMAdminCLI extends HAAdmin { "Indicates the timeout tracking should be handled by the client."); opts.addOption("server", false, "Indicates the timeout tracking should be handled by the RM."); + opts.addOption(OPTION_SUBCLUSTERID); int exitCode = -1; CommandLine cliParser = null; @@ -839,6 +913,7 @@ public class RMAdminCLI extends HAAdmin { } int timeout = -1; + String subClusterId = parseSubClusterId(cliParser); if (cliParser.hasOption("g")) { String strTimeout = cliParser.getOptionValue("g"); if (strTimeout != null) { @@ -853,9 +928,9 @@ public class RMAdminCLI extends HAAdmin { printUsage(cmd, isHAEnabled); return -1; } - return refreshNodes(timeout, trackingMode); + return refreshNodes(timeout, trackingMode, subClusterId); } else { - return refreshNodes(); + return refreshNodes(subClusterId); } } @@ -875,7 +950,7 @@ public class RMAdminCLI extends HAAdmin { * @throws YarnException if any issues thrown from server */ private int handleUpdateNodeResource( - String[] args, String cmd, boolean isHAEnabled) + String[] args, String cmd, boolean isHAEnabled, String subClusterId) throws YarnException, IOException { int i = 1; int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT; @@ -913,7 +988,7 @@ public class RMAdminCLI extends HAAdmin { if (i == args.length - 1) { overCommitTimeout = Integer.parseInt(args[i]); } - return updateNodeResource(nodeID, resource, overCommitTimeout); + return updateNodeResource(nodeID, resource, overCommitTimeout, subClusterId); } private Resource parseCommandAndCreateResource(String resourceTypes) { @@ -1039,6 +1114,54 @@ public class RMAdminCLI extends HAAdmin { return "Usage: rmadmin"; } + /** + * Parse subClusterId. + * This method will only parse subClusterId when Yarn Federation mode is enabled. + * + * @param cliParser CommandLine. + * @return subClusterId. + */ + protected String parseSubClusterId(CommandLine cliParser) { + // If YARN Federation mode is not enabled, return empty. + if (!isYarnFederationEnabled(getConf())) { + return StringUtils.EMPTY; + } + + String subClusterId = cliParser.getOptionValue(OPTION_SUBCLUSTERID); + if (StringUtils.isBlank(subClusterId)) { + return StringUtils.EMPTY; + } + + System.out.println("SubClusterId : " + subClusterId); + return subClusterId; + } + + protected String parseSubClusterId(String[] args, boolean isHAEnabled) { + // If YARN Federation mode is not enabled, return empty. + if (!isYarnFederationEnabled(getConf())) { + return StringUtils.EMPTY; + } + + Options opts = new Options(); + opts.addOption("refreshQueues", false, + "Refresh the hosts information at the ResourceManager."); + opts.addOption("refreshNodesResources", false, ""); + opts.addOption("refreshUserToGroupsMappings", false, ""); + opts.addOption("refreshUserToGroupsMappings", false, ""); + opts.addOption("updateNodeResource", false, ""); + opts.addOption(OPTION_SUBCLUSTERID); + + CommandLine cliParser = null; + try { + cliParser = new DefaultParser().parse(opts, args); + } catch (ParseException ex) { + System.err.println("parseSubClusterId error, " + ex.getMessage()); + printUsage(args[0], isHAEnabled); + } + + return parseSubClusterId(cliParser); + } + public static void main(String[] args) throws Exception { int result = ToolRunner.run(new RMAdminCLI(), args); System.exit(result); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java index 411526db41..40cde4fe11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java @@ -1099,4 +1099,27 @@ public class TestRMAdminCLI { assertFalse(errOut.contains("-failover")); dataErr.reset(); } + + @Test + public void testParseSubClusterId() throws Exception { + rmAdminCLI.getConf().setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + + // replaceLabelsOnNode + String[] replaceLabelsOnNodeArgs = {"-replaceLabelsOnNode", + "node1:8000,x node2:8000=y node3,x node4=Y", "-subClusterId", "SC-1"}; + assertEquals(0, rmAdminCLI.run(replaceLabelsOnNodeArgs)); + + String[] refreshQueuesArgs = {"-refreshQueues", "-subClusterId", "SC-1"}; + assertEquals(0, rmAdminCLI.run(refreshQueuesArgs)); + + String[] refreshNodesResourcesArgs = {"-refreshNodesResources", "-subClusterId", "SC-1"}; + assertEquals(0, rmAdminCLI.run(refreshNodesResourcesArgs)); + + String nodeIdStr = "0.0.0.0:0"; + String resourceTypes = "memory-mb=1024Mi,vcores=1,resource2"; + String[] updateNodeResourceArgs = {"-updateNodeResource", nodeIdStr, + resourceTypes, "-subClusterId", "SC-1"}; + rmAdminCLI.parseSubClusterId(updateNodeResourceArgs, false); + assertEquals(-1, rmAdminCLI.run(updateNodeResourceArgs)); + } }