HDFS-10557. Fix handling of the -fs Generic option. (Arpit Agarwal)

This commit is contained in:
Arpit Agarwal 2016-06-22 08:23:45 -07:00
parent 7b23ad1ef7
commit 66fa34c839
7 changed files with 27 additions and 72 deletions

View File

@ -45,9 +45,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.file.Files; import java.nio.file.Files;
@ -93,8 +91,7 @@ public Command(Configuration conf) {
* Executes the Client Calls. * Executes the Client Calls.
* *
* @param cmd - CommandLine * @param cmd - CommandLine
* @throws IOException * @throws Exception
* @throws URISyntaxException
*/ */
public abstract void execute(CommandLine cmd) throws Exception; public abstract void execute(CommandLine cmd) throws Exception;
@ -103,22 +100,6 @@ public Command(Configuration conf) {
*/ */
public abstract void printHelp(); public abstract void printHelp();
/**
* verifies user provided URL.
*
* @param uri - UrlString
* @return URL
* @throws URISyntaxException, MalformedURLException
*/
protected URI verifyURI(String uri)
throws URISyntaxException, MalformedURLException {
if ((uri == null) || uri.isEmpty()) {
throw new MalformedURLException(
"A valid URI is needed to execute this command.");
}
return new URI(uri);
}
/** /**
* Process the URI and return the cluster with nodes setup. This is used in * Process the URI and return the cluster with nodes setup. This is used in
* all commands. * all commands.
@ -130,11 +111,8 @@ protected URI verifyURI(String uri)
protected DiskBalancerCluster readClusterInfo(CommandLine cmd) throws protected DiskBalancerCluster readClusterInfo(CommandLine cmd) throws
Exception { Exception {
Preconditions.checkNotNull(cmd); Preconditions.checkNotNull(cmd);
Preconditions
.checkState(cmd.getOptionValue(DiskBalancer.NAMENODEURI) != null,
"Required argument missing : uri");
setClusterURI(verifyURI(cmd.getOptionValue(DiskBalancer.NAMENODEURI))); setClusterURI(FileSystem.getDefaultUri(getConf()));
LOG.debug("using name node URI : {}", this.getClusterURI()); LOG.debug("using name node URI : {}", this.getClusterURI());
ClusterConnector connector = ConnectorFactory.getCluster(this.clusterURI, ClusterConnector connector = ConnectorFactory.getCluster(this.clusterURI,
getConf()); getConf());
@ -346,6 +324,7 @@ private static UserGroupInformation getUGI()
* *
* @param fileName - fileName to open. * @param fileName - fileName to open.
* @return OutputStream. * @return OutputStream.
* @throws IOException
*/ */
protected FSDataOutputStream create(String fileName) throws IOException { protected FSDataOutputStream create(String fileName) throws IOException {
Preconditions.checkNotNull(fileName); Preconditions.checkNotNull(fileName);

View File

@ -63,10 +63,9 @@ public PlanCommand(Configuration conf) {
this.thresholdPercentage = 1; this.thresholdPercentage = 1;
this.bandwidth = 0; this.bandwidth = 0;
this.maxError = 0; this.maxError = 0;
addValidCommandParameters(DiskBalancer.NAMENODEURI, "Name Node URI or " + addValidCommandParameters(DiskBalancer.OUTFILE, "Output directory in " +
"file URI for cluster"); "HDFS. The generated plan will be written to a file in this " +
"directory.");
addValidCommandParameters(DiskBalancer.OUTFILE, "Output file");
addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " + addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
"be used while copying."); "be used while copying.");
addValidCommandParameters(DiskBalancer.THRESHOLD, "Percentage skew that " + addValidCommandParameters(DiskBalancer.THRESHOLD, "Percentage skew that " +
@ -188,7 +187,7 @@ private void populatePathNames(DiskBalancerDataNode node) throws IOException {
*/ */
@Override @Override
public void printHelp() { public void printHelp() {
String header = "creates a plan that describes how much data should be " + String header = "Creates a plan that describes how much data should be " +
"moved between disks.\n\n"; "moved between disks.\n\n";
String footer = "\nPlan command creates a set of steps that represent a " + String footer = "\nPlan command creates a set of steps that represent a " +
@ -196,7 +195,7 @@ public void printHelp() {
" will balance the data."; " will balance the data.";
HelpFormatter helpFormatter = new HelpFormatter(); HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -uri <namenode> -plan " + helpFormatter.printHelp("hdfs diskbalancer -plan " +
"<hostname> [options]", header, DiskBalancer.getPlanOptions(), footer); "<hostname> [options]", header, DiskBalancer.getPlanOptions(), footer);
} }

View File

@ -201,9 +201,9 @@ public void printHelp() {
" datanode, or prints out the list of nodes that will benefit from " + " datanode, or prints out the list of nodes that will benefit from " +
"running disk balancer. Top defaults to " + getDefaultTop(); "running disk balancer. Top defaults to " + getDefaultTop();
String footer = ". E.g.:\n" String footer = ". E.g.:\n"
+ "hdfs diskbalancer -fs http://namenode.uri -report\n" + "hdfs diskbalancer -report\n"
+ "hdfs diskbalancer -fs http://namenode.uri -report -top 5\n" + "hdfs diskbalancer -report -top 5\n"
+ "hdfs diskbalancer -fs http://namenode.uri -report " + "hdfs diskbalancer -report "
+ "-node {DataNodeID | IP | Hostname}"; + "-node {DataNodeID | IP | Hostname}";
HelpFormatter helpFormatter = new HelpFormatter(); HelpFormatter helpFormatter = new HelpFormatter();

View File

@ -67,7 +67,8 @@ public NodePlan plan(DiskBalancerDataNode node) throws Exception {
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
NodePlan plan = new NodePlan(node.getDataNodeName(), NodePlan plan = new NodePlan(node.getDataNodeName(),
node.getDataNodePort()); node.getDataNodePort());
LOG.info("Starting plan for Node : " + node.getDataNodeUUID()); LOG.info("Starting plan for Node : {}:{}",
node.getDataNodeName(), node.getDataNodePort());
while (node.isBalancingNeeded(this.threshold)) { while (node.isBalancingNeeded(this.threshold)) {
for (DiskBalancerVolumeSet vSet : node.getVolumeSets().values()) { for (DiskBalancerVolumeSet vSet : node.getVolumeSets().values()) {
balanceVolumeSet(node, vSet, plan); balanceVolumeSet(node, vSet, plan);
@ -76,8 +77,9 @@ public NodePlan plan(DiskBalancerDataNode node) throws Exception {
long endTime = Time.monotonicNow(); long endTime = Time.monotonicNow();
String message = String String message = String
.format("Compute Plan for Node : %s took %d ms ", .format("Compute Plan for Node : %s:%d took %d ms ",
node.getDataNodeUUID(), endTime - startTime); node.getDataNodeName(), node.getDataNodePort(),
endTime - startTime);
LOG.info(message); LOG.info(message);
return plan; return plan;
} }

View File

@ -51,16 +51,6 @@
* utilization equal and then those moves are executed by the datanode. * utilization equal and then those moves are executed by the datanode.
*/ */
public class DiskBalancer extends Configured implements Tool { public class DiskBalancer extends Configured implements Tool {
/**
* NameNodeURI can point to either a real namenode, or a json file that
* contains the diskBalancer data in json form, that jsonNodeConnector knows
* how to deserialize.
* <p>
* Expected formats are :
* <p>
* hdfs://namenode.uri or file:///data/myCluster.json
*/
public static final String NAMENODEURI = "fs";
/** /**
* Computes a plan for a given set of nodes. * Computes a plan for a given set of nodes.
*/ */
@ -275,13 +265,6 @@ public static Options getReportOptions() {
*/ */
private void addPlanCommands(Options opt) { private void addPlanCommands(Options opt) {
Option uri = OptionBuilder.withLongOpt(NAMENODEURI)
.withDescription("Address of the Namenode. e,g. hdfs://namenode")
.hasArg()
.create();
getPlanOptions().addOption(uri);
opt.addOption(uri);
Option plan = OptionBuilder.withLongOpt(PLAN) Option plan = OptionBuilder.withLongOpt(PLAN)
.withDescription("creates a plan for datanode.") .withDescription("creates a plan for datanode.")
.hasArg() .hasArg()
@ -336,7 +319,6 @@ private void addPlanCommands(Options opt) {
private void addHelpCommands(Options opt) { private void addHelpCommands(Options opt) {
Option help = OptionBuilder.withLongOpt(HELP) Option help = OptionBuilder.withLongOpt(HELP)
.hasOptionalArg() .hasOptionalArg()
.withArgName(HELP)
.withDescription("valid commands are plan | execute | query | cancel" + .withDescription("valid commands are plan | execute | query | cancel" +
" | report") " | report")
.create(); .create();

View File

@ -53,11 +53,9 @@ The following sections discusses what commands are supported by disk balancer
The plan command can be run against a given datanode by running The plan command can be run against a given datanode by running
`hdfs diskbalancer -uri hdfs://mycluster.com -plan node1.mycluster.com` `hdfs diskbalancer -plan node1.mycluster.com`
uri is the address of the namenode and -plan points to the datanode that we The command accepts [Generic Options](../hadoop-common/CommandsManual.html#Generic_Options).
need to plan for. By deafult, plan command writes its output to
**/system/diskbalancer**.
The plan command also has a set of parameters that allows user to control The plan command also has a set of parameters that allows user to control
the output and execution of the plan. the output and execution of the plan.

View File

@ -31,6 +31,7 @@
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -74,8 +75,7 @@ public void tearDown() throws Exception {
/* test basic report */ /* test basic report */
@Test(timeout=60000) @Test(timeout=60000)
public void testReportSimple() throws Exception { public void testReportSimple() throws Exception {
final String cmdLine = String.format("hdfs diskbalancer -fs %s -report", final String cmdLine = "hdfs diskbalancer -report";
clusterJson.toString());
final List<String> outputs = runCommand(cmdLine); final List<String> outputs = runCommand(cmdLine);
assertThat( assertThat(
@ -103,8 +103,7 @@ public void testReportSimple() throws Exception {
/* test less than 64 DataNode(s) as total, e.g., -report -top 32 */ /* test less than 64 DataNode(s) as total, e.g., -report -top 32 */
@Test(timeout=60000) @Test(timeout=60000)
public void testReportLessThanTotal() throws Exception { public void testReportLessThanTotal() throws Exception {
final String cmdLine = String.format( final String cmdLine = "hdfs diskbalancer -report -top 32";
"hdfs diskbalancer -fs %s -report -top 32", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine); final List<String> outputs = runCommand(cmdLine);
assertThat( assertThat(
@ -127,8 +126,7 @@ public void testReportLessThanTotal() throws Exception {
/* test more than 64 DataNode(s) as total, e.g., -report -top 128 */ /* test more than 64 DataNode(s) as total, e.g., -report -top 128 */
@Test(timeout=60000) @Test(timeout=60000)
public void testReportMoreThanTotal() throws Exception { public void testReportMoreThanTotal() throws Exception {
final String cmdLine = String.format( final String cmdLine = "hdfs diskbalancer -report -top 128";
"hdfs diskbalancer -fs %s -report -top 128", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine); final List<String> outputs = runCommand(cmdLine);
assertThat( assertThat(
@ -152,8 +150,7 @@ public void testReportMoreThanTotal() throws Exception {
/* test invalid top limit, e.g., -report -top xx */ /* test invalid top limit, e.g., -report -top xx */
@Test(timeout=60000) @Test(timeout=60000)
public void testReportInvalidTopLimit() throws Exception { public void testReportInvalidTopLimit() throws Exception {
final String cmdLine = String.format( final String cmdLine = "hdfs diskbalancer -report -top xx";
"hdfs diskbalancer -fs %s -report -top xx", clusterJson.toString());
final List<String> outputs = runCommand(cmdLine); final List<String> outputs = runCommand(cmdLine);
assertThat( assertThat(
@ -177,14 +174,11 @@ public void testReportInvalidTopLimit() throws Exception {
containsString("9 volumes with node data density 1.97")))); containsString("9 volumes with node data density 1.97"))));
} }
/* test -report -node DataNodeID */
@Test(timeout=60000) @Test(timeout=60000)
public void testReportNode() throws Exception { public void testReportNode() throws Exception {
final String cmdLine = String final String cmdLine =
.format( "hdfs diskbalancer -report -node " +
"hdfs diskbalancer -fs %s -report -node " "a87654a9-54c7-4693-8dd9-c9c7021dc340";
+ "a87654a9-54c7-4693-8dd9-c9c7021dc340",
clusterJson.toString());
final List<String> outputs = runCommand(cmdLine); final List<String> outputs = runCommand(cmdLine);
assertThat( assertThat(
@ -275,6 +269,7 @@ private List<String> runCommand(final String cmdLine) throws Exception {
org.apache.hadoop.hdfs.tools.DiskBalancer db = org.apache.hadoop.hdfs.tools.DiskBalancer db =
new org.apache.hadoop.hdfs.tools.DiskBalancer(conf); new org.apache.hadoop.hdfs.tools.DiskBalancer(conf);
FileSystem.setDefaultUri(conf, clusterJson);
ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut); PrintStream out = new PrintStream(bufOut);
db.run(cmds, out); db.run(cmds, out);