HDFS-17361. DiskBalancer: Query command support with multiple nodes (#6508)
Signed-off-by: Takanobu Asanuma <tasanuma@apache.org>
This commit is contained in:
parent
9751b6e41a
commit
03d9acaa86
@ -19,6 +19,8 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.TextStringBuilder;
|
||||
import org.apache.hadoop.util.Preconditions;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.HelpFormatter;
|
||||
@ -30,6 +32,11 @@
|
||||
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
/**
|
||||
* Gets the current status of disk balancer command.
|
||||
*/
|
||||
@ -41,9 +48,13 @@ public class QueryCommand extends Command {
|
||||
* @param conf - Configuration.
|
||||
*/
|
||||
public QueryCommand(Configuration conf) {
|
||||
super(conf);
|
||||
this(conf, System.out);
|
||||
}
|
||||
|
||||
public QueryCommand(Configuration conf, final PrintStream ps) {
|
||||
super(conf, ps);
|
||||
addValidCommandParameters(DiskBalancerCLI.QUERY,
|
||||
"Queries the status of disk plan running on a given datanode.");
|
||||
"Queries the status of disk plan running on given datanode(s).");
|
||||
addValidCommandParameters(DiskBalancerCLI.VERBOSE,
|
||||
"Prints verbose results.");
|
||||
}
|
||||
@ -56,37 +67,55 @@ public QueryCommand(Configuration conf) {
|
||||
@Override
|
||||
public void execute(CommandLine cmd) throws Exception {
|
||||
LOG.info("Executing \"query plan\" command.");
|
||||
TextStringBuilder result = new TextStringBuilder();
|
||||
Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
|
||||
verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
|
||||
String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
|
||||
Preconditions.checkNotNull(nodeName);
|
||||
nodeName = nodeName.trim();
|
||||
String nodeAddress = nodeName;
|
||||
|
||||
// if the string is not name:port format use the default port.
|
||||
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
|
||||
int defaultIPC = NetUtils.createSocketAddr(
|
||||
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
||||
nodeAddress = nodeName + ":" + defaultIPC;
|
||||
LOG.debug("Using default data node port : {}", nodeAddress);
|
||||
String nodeVal = cmd.getOptionValue(DiskBalancerCLI.QUERY);
|
||||
if (StringUtils.isBlank(nodeVal)) {
|
||||
String warnMsg = "The number of input nodes is 0. "
|
||||
+ "Please input the valid nodes.";
|
||||
throw new DiskBalancerException(warnMsg,
|
||||
DiskBalancerException.Result.INVALID_NODE);
|
||||
}
|
||||
|
||||
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
|
||||
try {
|
||||
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
|
||||
System.out.printf("Plan File: %s%nPlan ID: %s%nResult: %s%n",
|
||||
workStatus.getPlanFile(),
|
||||
workStatus.getPlanID(),
|
||||
workStatus.getResult().toString());
|
||||
|
||||
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
|
||||
System.out.printf("%s", workStatus.currentStateString());
|
||||
nodeVal = nodeVal.trim();
|
||||
Set<String> resultSet = new TreeSet<>();
|
||||
String[] nodes = nodeVal.split(",");
|
||||
Collections.addAll(resultSet, nodes);
|
||||
String outputLine = String.format(
|
||||
"Get current status of the diskbalancer for DataNode(s). "
|
||||
+ "These DataNode(s) are parsed from '%s'.", nodeVal);
|
||||
recordOutput(result, outputLine);
|
||||
for (String nodeName : resultSet) {
|
||||
// if the string is not name:port format use the default port.
|
||||
String nodeAddress = nodeName;
|
||||
if (!nodeName.matches("[^\\:]+:[0-9]{2,5}")) {
|
||||
int defaultIPC = NetUtils.createSocketAddr(
|
||||
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
|
||||
nodeAddress = nodeName + ":" + defaultIPC;
|
||||
LOG.debug("Using default data node port : {}", nodeAddress);
|
||||
}
|
||||
|
||||
ClientDatanodeProtocol dataNode = getDataNodeProxy(nodeAddress);
|
||||
try {
|
||||
DiskBalancerWorkStatus workStatus = dataNode.queryDiskBalancerPlan();
|
||||
outputLine = String.format("DataNode: %s%nPlan File: %s%nPlan ID: %s%nResult: %s%n",
|
||||
nodeAddress,
|
||||
workStatus.getPlanFile(),
|
||||
workStatus.getPlanID(),
|
||||
workStatus.getResult().toString());
|
||||
result.append(outputLine);
|
||||
if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
|
||||
outputLine = String.format("%s", workStatus.currentStateString());
|
||||
result.append(outputLine);
|
||||
}
|
||||
result.append(System.lineSeparator());
|
||||
} catch (DiskBalancerException ex) {
|
||||
LOG.error("Query plan failed by {}", nodeAddress, ex);
|
||||
throw ex;
|
||||
}
|
||||
} catch (DiskBalancerException ex) {
|
||||
LOG.error("Query plan failed.", ex);
|
||||
throw ex;
|
||||
}
|
||||
getPrintStream().println(result);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -94,14 +123,14 @@ public void execute(CommandLine cmd) throws Exception {
|
||||
*/
|
||||
@Override
|
||||
public void printHelp() {
|
||||
String header = "Query Plan queries a given data node about the " +
|
||||
String header = "Query Plan queries given datanode(s) about the " +
|
||||
"current state of disk balancer execution.\n\n";
|
||||
|
||||
String footer = "\nQuery command retrievs the plan ID and the current " +
|
||||
"running state. ";
|
||||
|
||||
HelpFormatter helpFormatter = new HelpFormatter();
|
||||
helpFormatter.printHelp("hdfs diskbalancer -query <hostname> [options]",
|
||||
helpFormatter.printHelp("hdfs diskbalancer -query <hostname,hostname,...> " +
|
||||
" [options]",
|
||||
header, DiskBalancerCLI.getQueryOptions(), footer);
|
||||
}
|
||||
}
|
||||
|
@ -378,7 +378,7 @@ private void addQueryCommands(Options opt) {
|
||||
Option query = Option.builder().longOpt(QUERY)
|
||||
.hasArg()
|
||||
.desc("Queries the disk balancer " +
|
||||
"status of a given datanode.")
|
||||
"status of given datanode(s).")
|
||||
.build();
|
||||
getQueryOptions().addOption(query);
|
||||
opt.addOption(query);
|
||||
@ -387,7 +387,7 @@ private void addQueryCommands(Options opt) {
|
||||
// added to global table.
|
||||
Option verbose = Option.builder().longOpt(VERBOSE)
|
||||
.desc("Prints details of the plan that is being executed " +
|
||||
"on the node.")
|
||||
"on the datanode(s).")
|
||||
.build();
|
||||
getQueryOptions().addOption(verbose);
|
||||
}
|
||||
@ -482,7 +482,7 @@ private int dispatch(CommandLine cmd)
|
||||
}
|
||||
|
||||
if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
|
||||
dbCmd = new QueryCommand(getConf());
|
||||
dbCmd = new QueryCommand(getConf(), this.printStream);
|
||||
}
|
||||
|
||||
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
|
||||
|
@ -86,9 +86,9 @@ So, query command can help to get the current status of execute command.
|
||||
|
||||
### Query
|
||||
|
||||
Query command gets the current status of the diskbalancer from a datanode.
|
||||
Query command gets the current status of the diskbalancer from specified node(s).
|
||||
|
||||
`hdfs diskbalancer -query nodename.mycluster.com`
|
||||
`hdfs diskbalancer -query nodename1.mycluster.com,nodename2.mycluster.com,...`
|
||||
|
||||
| COMMAND\_OPTION | Description |
|
||||
|:---- |:---- |
|
||||
|
@ -814,17 +814,44 @@ private List<String> runCommand(
|
||||
return runCommandInternal(cmdLine, clusterConf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Making sure that we can query the multiple nodes without having done a submit.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDiskBalancerQueryWithoutSubmitAndMultipleNodes() throws Exception {
|
||||
Configuration hdfsConf = new HdfsConfiguration();
|
||||
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
final int numDatanodes = 2;
|
||||
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
|
||||
.numDataNodes(numDatanodes).build();
|
||||
try {
|
||||
miniDFSCluster.waitActive();
|
||||
DataNode dataNode1 = miniDFSCluster.getDataNodes().get(0);
|
||||
DataNode dataNode2 = miniDFSCluster.getDataNodes().get(1);
|
||||
final String queryArg = String.format("-query localhost:%d,localhost:%d", dataNode1
|
||||
.getIpcPort(), dataNode2.getIpcPort());
|
||||
final String cmdLine = String.format("hdfs diskbalancer %s", queryArg);
|
||||
List<String> outputs = runCommand(cmdLine);
|
||||
assertThat(outputs.get(1), containsString("localhost:" + dataNode1.getIpcPort()));
|
||||
assertThat(outputs.get(6), containsString("localhost:" + dataNode2.getIpcPort()));
|
||||
} finally {
|
||||
miniDFSCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Making sure that we can query the node without having done a submit.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDiskBalancerQueryWithoutSubmit() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
Configuration hdfsConf = new HdfsConfiguration();
|
||||
hdfsConf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||
final int numDatanodes = 2;
|
||||
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
|
||||
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(conf, basedir)
|
||||
MiniDFSCluster miniDFSCluster = new MiniDFSCluster.Builder(hdfsConf, basedir)
|
||||
.numDataNodes(numDatanodes).build();
|
||||
try {
|
||||
miniDFSCluster.waitActive();
|
||||
|
Loading…
Reference in New Issue
Block a user