YARN-11517. Improve Federation#RouterCLI DeregisterSubCluster Code. (#5766)

This commit is contained in:
slfan1989 2023-06-27 00:43:49 +08:00 committed by GitHub
parent 161f80810f
commit a4cf4c3778
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 52 deletions

View File

@ -23,6 +23,7 @@
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.ha.HAAdmin.UsageInfo; import org.apache.hadoop.ha.HAAdmin.UsageInfo;
@ -30,6 +31,7 @@
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.util.FormattingCLIUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol; import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
@ -38,6 +40,11 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters; import org.apache.hadoop.yarn.server.api.protocolrecords.DeregisterSubClusters;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -45,18 +52,27 @@ public class RouterCLI extends Configured implements Tool {
protected final static Map<String, UsageInfo> ADMIN_USAGE = protected final static Map<String, UsageInfo> ADMIN_USAGE =
ImmutableMap.<String, UsageInfo>builder().put("-deregisterSubCluster", ImmutableMap.<String, UsageInfo>builder().put("-deregisterSubCluster",
new UsageInfo("[-sc|subClusterId [subCluster id]]", new UsageInfo("[-sc|--subClusterId [subCluster Id]]",
"deregister subCluster, if the interval between the heartbeat time of the subCluster " + "Deregister SubCluster, If the interval between the heartbeat time of the subCluster " +
"and the current time exceeds the timeout period, " + "and the current time exceeds the timeout period, " +
"set the state of the subCluster to SC_LOST")).build(); "set the state of the subCluster to SC_LOST")).build();
// title information // Command Constant
private final static String SUB_CLUSTER_ID = "SubClusterId"; private static final String CMD_EMPTY = "";
private final static String DEREGISTER_STATE = "DeregisterState"; private static final int EXIT_SUCCESS = 0;
private final static String LAST_HEARTBEAT_TIME = "LastHeartBeatTime"; private static final int EXIT_ERROR = -1;
private final static String INFORMATION = "Information";
private final static String SUB_CLUSTER_STATE = "SubClusterState"; // Command1: deregisterSubCluster
private static final String DEREGISTER_SUBCLUSTER_PATTERN = "%30s\t%20s\t%30s\t%30s\t%20s"; private static final String DEREGISTER_SUBCLUSTER_TITLE =
"Yarn Federation Deregister SubCluster";
// Columns information
private static final List<String> DEREGISTER_SUBCLUSTER_HEADER = Arrays.asList(
"SubCluster Id", "Deregister State", "Last HeartBeatTime", "Information", "SubCluster State");
// Constant
private static final String OPTION_SC = "sc";
private static final String OPTION_SUBCLUSTERID = "subClusterId";
private static final String CMD_DEREGISTERSUBCLUSTER = "-deregisterSubCluster";
private static final String CMD_HELP = "-help";
public RouterCLI() { public RouterCLI() {
super(); super();
@ -109,18 +125,20 @@ private static void buildIndividualUsageMsg(String cmd, StringBuilder builder) {
private static void printHelp() { private static void printHelp() {
StringBuilder summary = new StringBuilder(); StringBuilder summary = new StringBuilder();
summary.append("router-admin is the command to execute ") summary.append("routeradmin is the command to execute ")
.append("YARN Federation administrative commands.\n"); .append("YARN Federation administrative commands.\n")
summary.append("The full syntax is: \n\n") .append("The full syntax is: \n\n")
.append("routeradmin") .append("routeradmin")
.append(" [-deregisterSubCluster [-c|clusterId [subClusterId]]"); .append(" [-deregisterSubCluster [-sc|--subClusterId [subCluster Id]]")
summary.append(" [-help [cmd]]").append("\n"); .append(" [-help [cmd]]").append("\n");
StringBuilder helpBuilder = new StringBuilder(); StringBuilder helpBuilder = new StringBuilder();
System.out.println(summary); System.out.println(summary);
for (String cmdKey : ADMIN_USAGE.keySet()) { for (String cmdKey : ADMIN_USAGE.keySet()) {
buildHelpMsg(cmdKey, helpBuilder); buildHelpMsg(cmdKey, helpBuilder);
helpBuilder.append("\n"); helpBuilder.append("\n");
} }
helpBuilder.append(" -help [cmd]: Displays help for the given command or all commands") helpBuilder.append(" -help [cmd]: Displays help for the given command or all commands")
.append(" if none is specified."); .append(" if none is specified.");
System.out.println(helpBuilder); System.out.println(helpBuilder);
@ -136,8 +154,8 @@ protected ResourceManagerAdministrationProtocol createAdminProtocol()
} }
private static void buildUsageMsg(StringBuilder builder) { private static void buildUsageMsg(StringBuilder builder) {
builder.append("router-admin is only used in Yarn Federation Mode.\n"); builder.append("routeradmin is only used in Yarn Federation Mode.\n");
builder.append("Usage: router-admin\n"); builder.append("Usage: routeradmin\n");
for (Map.Entry<String, UsageInfo> cmdEntry : ADMIN_USAGE.entrySet()) { for (Map.Entry<String, UsageInfo> cmdEntry : ADMIN_USAGE.entrySet()) {
UsageInfo usageInfo = cmdEntry.getValue(); UsageInfo usageInfo = cmdEntry.getValue();
builder.append(" ") builder.append(" ")
@ -160,28 +178,52 @@ private static void printUsage(String cmd) {
ToolRunner.printGenericCommandUsage(System.err); ToolRunner.printGenericCommandUsage(System.err);
} }
/**
* According to the parameter Deregister SubCluster.
*
* @param args parameter array.
* @return If the Deregister SubCluster operation is successful,
* it will return 0. Otherwise, it will return -1.
*
* @throws IOException raised on errors performing I/O.
* @throws YarnException exceptions from yarn servers.
* @throws ParseException Exceptions thrown during parsing of a command-line.
*/
private int handleDeregisterSubCluster(String[] args) private int handleDeregisterSubCluster(String[] args)
throws IOException, YarnException, ParseException { throws IOException, YarnException, ParseException {
// Prepare Options.
Options opts = new Options(); Options opts = new Options();
opts.addOption("deregisterSubCluster", false, opts.addOption("deregisterSubCluster", false,
"Refresh the hosts information at the ResourceManager."); "Deregister YARN subCluster, if subCluster Heartbeat Timeout.");
Option gracefulOpt = new Option("c", "clusterId", true, Option subClusterOpt = new Option(OPTION_SC, OPTION_SUBCLUSTERID, true,
"Wait for timeout before marking the NodeManager as decommissioned."); "The subCluster can be specified using either the '-sc' or '--subCluster' option. " +
gracefulOpt.setOptionalArg(true); " If the subCluster's Heartbeat Timeout, it will be marked as 'SC_LOST'.");
opts.addOption(gracefulOpt); subClusterOpt.setOptionalArg(true);
opts.addOption(subClusterOpt);
// Parse command line arguments.
CommandLine cliParser; CommandLine cliParser;
try { try {
cliParser = new GnuParser().parse(opts, args); cliParser = new GnuParser().parse(opts, args);
} catch (MissingArgumentException ex) { } catch (MissingArgumentException ex) {
System.out.println("Missing argument for options"); System.out.println("Missing argument for options");
printUsage(args[0]); printUsage(args[0]);
return -1; return EXIT_ERROR;
} }
if (cliParser.hasOption("c")) { // Try to parse the subClusterId.
String subClusterId = cliParser.getOptionValue("c"); String subClusterId = null;
if (cliParser.hasOption(OPTION_SC) || cliParser.hasOption(OPTION_SUBCLUSTERID)) {
subClusterId = cliParser.getOptionValue(OPTION_SC);
if (subClusterId == null) {
subClusterId = cliParser.getOptionValue(OPTION_SUBCLUSTERID);
}
}
// If subClusterId is not empty, try deregisterSubCluster subCluster,
// otherwise try deregisterSubCluster all subCluster.
if (StringUtils.isNotBlank(subClusterId)) {
return deregisterSubCluster(subClusterId); return deregisterSubCluster(subClusterId);
} else { } else {
return deregisterSubCluster(); return deregisterSubCluster();
@ -190,12 +232,14 @@ private int handleDeregisterSubCluster(String[] args)
private int deregisterSubCluster(String subClusterId) private int deregisterSubCluster(String subClusterId)
throws IOException, YarnException { throws IOException, YarnException {
PrintWriter writer = new PrintWriter(new OutputStreamWriter(
System.out, Charset.forName(StandardCharsets.UTF_8.name())));
ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol(); ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
DeregisterSubClusterRequest request = DeregisterSubClusterRequest request =
DeregisterSubClusterRequest.newInstance(subClusterId); DeregisterSubClusterRequest.newInstance(subClusterId);
DeregisterSubClusterResponse response = adminProtocol.deregisterSubCluster(request); DeregisterSubClusterResponse response = adminProtocol.deregisterSubCluster(request);
System.out.println(String.format(DEREGISTER_SUBCLUSTER_PATTERN, FormattingCLIUtils formattingCLIUtils = new FormattingCLIUtils(DEREGISTER_SUBCLUSTER_TITLE)
SUB_CLUSTER_ID, DEREGISTER_STATE, LAST_HEARTBEAT_TIME, INFORMATION, SUB_CLUSTER_STATE)); .addHeaders(DEREGISTER_SUBCLUSTER_HEADER);
List<DeregisterSubClusters> deregisterSubClusters = response.getDeregisterSubClusters(); List<DeregisterSubClusters> deregisterSubClusters = response.getDeregisterSubClusters();
deregisterSubClusters.forEach(deregisterSubCluster -> { deregisterSubClusters.forEach(deregisterSubCluster -> {
String responseSubClusterId = deregisterSubCluster.getSubClusterId(); String responseSubClusterId = deregisterSubCluster.getSubClusterId();
@ -203,15 +247,17 @@ private int deregisterSubCluster(String subClusterId)
String lastHeartBeatTime = deregisterSubCluster.getLastHeartBeatTime(); String lastHeartBeatTime = deregisterSubCluster.getLastHeartBeatTime();
String info = deregisterSubCluster.getInformation(); String info = deregisterSubCluster.getInformation();
String subClusterState = deregisterSubCluster.getSubClusterState(); String subClusterState = deregisterSubCluster.getSubClusterState();
System.out.println(String.format(DEREGISTER_SUBCLUSTER_PATTERN, formattingCLIUtils.addLine(responseSubClusterId, deregisterState,
responseSubClusterId, deregisterState, lastHeartBeatTime, info, subClusterState)); lastHeartBeatTime, info, subClusterState);
}); });
return 0; writer.print(formattingCLIUtils.render());
writer.flush();
return EXIT_SUCCESS;
} }
private int deregisterSubCluster() throws IOException, YarnException { private int deregisterSubCluster() throws IOException, YarnException {
deregisterSubCluster(""); deregisterSubCluster(CMD_EMPTY);
return 0; return EXIT_SUCCESS;
} }
@Override @Override
@ -222,25 +268,26 @@ public int run(String[] args) throws Exception {
YarnConfiguration.DEFAULT_FEDERATION_ENABLED); YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
if (args.length < 1 || !isFederationEnabled) { if (args.length < 1 || !isFederationEnabled) {
printUsage(""); printUsage(CMD_EMPTY);
return -1; return EXIT_ERROR;
} }
String cmd = args[0]; String cmd = args[0];
if ("-help".equals(cmd)) {
if (CMD_HELP.equals(cmd)) {
if (args.length > 1) { if (args.length > 1) {
printUsage(args[1]); printUsage(args[1]);
} else { } else {
printHelp(); printHelp();
} }
return 0; return EXIT_SUCCESS;
} }
if ("-deregisterSubCluster".equals(cmd)) { if (CMD_DEREGISTERSUBCLUSTER.equals(cmd)) {
return handleDeregisterSubCluster(args); return handleDeregisterSubCluster(args);
} }
return 0; return EXIT_SUCCESS;
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -103,15 +103,17 @@ private DeregisterSubClusterResponse generateAllSubClusterData() {
@Test @Test
public void testHelp() throws Exception { public void testHelp() throws Exception {
PrintStream oldOutPrintStream = System.out;
PrintStream oldErrPrintStream = System.err;
ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
ByteArrayOutputStream dataErr = new ByteArrayOutputStream(); ByteArrayOutputStream dataErr = new ByteArrayOutputStream();
System.setOut(new PrintStream(dataOut)); System.setOut(new PrintStream(dataOut));
System.setErr(new PrintStream(dataErr)); System.setErr(new PrintStream(dataErr));
String[] args = {"-help"}; String[] args = {"-help"};
rmAdminCLI.run(args);
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-help", "-deregisterSubCluster"};
rmAdminCLI.run(args);
} }
@Test @Test
@ -120,7 +122,10 @@ public void testDeregisterSubCluster() throws Exception {
ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); ByteArrayOutputStream dataOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(dataOut)); System.setOut(new PrintStream(dataOut));
oldOutPrintStream.println(dataOut); oldOutPrintStream.println(dataOut);
String[] args = {"-deregisterSubCluster", "-c", "SC-1"}; String[] args = {"-deregisterSubCluster", "-sc", "SC-1"};
assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-deregisterSubCluster", "--subClusterId", "SC-1"};
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
} }
@ -134,10 +139,17 @@ public void testDeregisterSubClusters() throws Exception {
String[] args = {"-deregisterSubCluster"}; String[] args = {"-deregisterSubCluster"};
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-deregisterSubCluster", "-c"}; args = new String[]{"-deregisterSubCluster", "-sc"};
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-deregisterSubCluster", "-c", ""}; args = new String[]{"-deregisterSubCluster", "--sc", ""};
assertEquals(0, rmAdminCLI.run(args)); assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-deregisterSubCluster", "--subClusterId"};
assertEquals(0, rmAdminCLI.run(args));
args = new String[]{"-deregisterSubCluster", "--subClusterId", ""};
assertEquals(0, rmAdminCLI.run(args));
} }
} }

View File

@ -870,7 +870,7 @@ public DeregisterSubClusterResponse deregisterSubCluster(DeregisterSubClusterReq
*/ */
private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) { private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {
DeregisterSubClusters deregisterSubClusters = null; DeregisterSubClusters deregisterSubClusters;
try { try {
// Step1. Get subCluster information. // Step1. Get subCluster information.
@ -879,9 +879,9 @@ private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {
SubClusterState subClusterState = subClusterInfo.getState(); SubClusterState subClusterState = subClusterInfo.getState();
long lastHeartBeat = subClusterInfo.getLastHeartBeat(); long lastHeartBeat = subClusterInfo.getLastHeartBeat();
Date lastHeartBeatDate = new Date(lastHeartBeat); Date lastHeartBeatDate = new Date(lastHeartBeat);
deregisterSubClusters = DeregisterSubClusters.newInstance( deregisterSubClusters = DeregisterSubClusters.newInstance(
reqSubClusterId, "UNKNOWN", lastHeartBeatDate.toString(), "", subClusterState.name()); reqSubClusterId, "NONE", lastHeartBeatDate.toString(),
"Normal Heartbeat", subClusterState.name());
// Step2. Deregister subCluster. // Step2. Deregister subCluster.
if (subClusterState.isUsable()) { if (subClusterState.isUsable()) {
@ -891,11 +891,12 @@ private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {
long heartBearTimeInterval = Time.now() - lastHeartBeat; long heartBearTimeInterval = Time.now() - lastHeartBeat;
if (heartBearTimeInterval - heartbeatExpirationMillis < 0) { if (heartBearTimeInterval - heartbeatExpirationMillis < 0) {
boolean deregisterSubClusterFlag = boolean deregisterSubClusterFlag =
federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST); federationFacade.deregisterSubCluster(subClusterId, SubClusterState.SC_LOST);
if (deregisterSubClusterFlag) { if (deregisterSubClusterFlag) {
deregisterSubClusters.setDeregisterState("SUCCESS"); deregisterSubClusters.setDeregisterState("SUCCESS");
deregisterSubClusters.setSubClusterState("SC_LOST"); deregisterSubClusters.setSubClusterState("SC_LOST");
deregisterSubClusters.setInformation("Heartbeat Time >= 30 minutes."); deregisterSubClusters.setInformation("Heartbeat Time >= " +
heartbeatExpirationMillis / (1000 * 60) + "minutes");
} else { } else {
deregisterSubClusters.setDeregisterState("FAILED"); deregisterSubClusters.setDeregisterState("FAILED");
deregisterSubClusters.setInformation("DeregisterSubClusters Failed."); deregisterSubClusters.setInformation("DeregisterSubClusters Failed.");
@ -903,16 +904,16 @@ private DeregisterSubClusters deregisterSubCluster(String reqSubClusterId) {
} }
} else { } else {
deregisterSubClusters.setDeregisterState("FAILED"); deregisterSubClusters.setDeregisterState("FAILED");
deregisterSubClusters.setInformation("Heartbeat Time < 30 minutes. " + deregisterSubClusters.setInformation("The subCluster is Unusable, " +
"DeregisterSubCluster does not need to be executed"); "So it can't be Deregistered");
LOG.warn("SubCluster {} in State {} does not need to update state.", LOG.warn("The SubCluster {} is Unusable (SubClusterState:{}), So it can't be Deregistered",
subClusterId, subClusterState); subClusterId, subClusterState);
} }
return deregisterSubClusters; return deregisterSubClusters;
} catch (YarnException e) { } catch (YarnException e) {
LOG.error("SubCluster {} DeregisterSubCluster Failed", reqSubClusterId, e); LOG.error("SubCluster {} DeregisterSubCluster Failed", reqSubClusterId, e);
deregisterSubClusters = DeregisterSubClusters.newInstance( deregisterSubClusters = DeregisterSubClusters.newInstance(
reqSubClusterId, "FAILED", "UNKNOWN", e.getMessage(), "UNKNOWN"); reqSubClusterId, "FAILED", "UNKNOWN", e.getMessage(), "UNKNOWN");
return deregisterSubClusters; return deregisterSubClusters;
} }
} }