HDFS-10558. DiskBalancer: Print the full path to plan file. Contributed by Xiaobing Zhou.

This commit is contained in:
Anu Engineer 2016-10-14 17:07:59 -07:00
parent 76cc84e6d4
commit 30bb1970cc
8 changed files with 218 additions and 35 deletions

View File

@ -53,6 +53,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
@ -82,6 +83,7 @@ public abstract class Command extends Configured {
private FileSystem fs = null; private FileSystem fs = null;
private DiskBalancerCluster cluster = null; private DiskBalancerCluster cluster = null;
private int topNodes; private int topNodes;
private PrintStream ps;
private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer"); private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer");
@ -91,9 +93,25 @@ public abstract class Command extends Configured {
* Constructs a command. * Constructs a command.
*/ */
public Command(Configuration conf) { public Command(Configuration conf) {
this(conf, System.out);
}
/**
* Constructs a command.
*/
public Command(Configuration conf, final PrintStream ps) {
super(conf); super(conf);
// These arguments are valid for all commands. // These arguments are valid for all commands.
topNodes = 0; topNodes = 0;
this.ps = ps;
}
/**
* Gets printing stream.
* @return print stream
*/
PrintStream getPrintStream() {
return ps;
} }
/** /**
@ -423,7 +441,8 @@ protected void addValidCommandParameters(String key, String desc) {
* *
* @return Cluster. * @return Cluster.
*/ */
protected DiskBalancerCluster getCluster() { @VisibleForTesting
DiskBalancerCluster getCluster() {
return cluster; return cluster;
} }

View File

@ -78,7 +78,7 @@ public void execute(CommandLine cmd) throws Exception {
command = new CancelCommand(getConf()); command = new CancelCommand(getConf());
break; break;
case DiskBalancerCLI.REPORT: case DiskBalancerCLI.REPORT:
command = new ReportCommand(getConf(), null); command = new ReportCommand(getConf());
break; break;
default: default:
command = this; command = this;

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hdfs.server.diskbalancer.command; package org.apache.hadoop.hdfs.server.diskbalancer.command;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -31,6 +34,7 @@
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI; import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.io.PrintStream;
/** /**
* Class that implements Plan Command. * Class that implements Plan Command.
@ -49,7 +53,14 @@ public class PlanCommand extends Command {
* Constructs a plan command. * Constructs a plan command.
*/ */
public PlanCommand(Configuration conf) { public PlanCommand(Configuration conf) {
super(conf); this(conf, System.out);
}
/**
* Constructs a plan command.
*/
public PlanCommand(Configuration conf, final PrintStream ps) {
super(conf, ps);
this.thresholdPercentage = 1; this.thresholdPercentage = 1;
this.bandwidth = 0; this.bandwidth = 0;
this.maxError = 0; this.maxError = 0;
@ -73,9 +84,12 @@ public PlanCommand(Configuration conf) {
* -plan -node IP -plan -node hostName -plan -node DatanodeUUID * -plan -node IP -plan -node hostName -plan -node DatanodeUUID
* *
* @param cmd - CommandLine * @param cmd - CommandLine
* @throws Exception
*/ */
@Override @Override
public void execute(CommandLine cmd) throws Exception { public void execute(CommandLine cmd) throws Exception {
StrBuilder result = new StrBuilder();
String outputLine = "";
LOG.debug("Processing Plan Command."); LOG.debug("Processing Plan Command.");
Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.PLAN)); Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.PLAN));
verifyCommandOptions(DiskBalancerCLI.PLAN, cmd); verifyCommandOptions(DiskBalancerCLI.PLAN, cmd);
@ -131,22 +145,35 @@ public void execute(CommandLine cmd) throws Exception {
.getBytes(StandardCharsets.UTF_8)); .getBytes(StandardCharsets.UTF_8));
} }
if (plan != null && plan.getVolumeSetPlans().size() > 0) { try {
LOG.info("Writing plan to : {}", getOutputPath()); if (plan != null && plan.getVolumeSetPlans().size() > 0) {
try (FSDataOutputStream planStream = create(String.format( outputLine = String.format("Writing plan to: %s", getOutputPath());
DiskBalancerCLI.PLAN_TEMPLATE, recordOutput(result, outputLine);
cmd.getOptionValue(DiskBalancerCLI.PLAN)))) { try (FSDataOutputStream planStream = create(String.format(
planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8)); DiskBalancerCLI.PLAN_TEMPLATE,
cmd.getOptionValue(DiskBalancerCLI.PLAN)))) {
planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8));
}
} else {
outputLine = String.format(
"No plan generated. DiskBalancing not needed for node: %s"
+ " threshold used: %s",
cmd.getOptionValue(DiskBalancerCLI.PLAN), this.thresholdPercentage);
recordOutput(result, outputLine);
} }
} else {
LOG.info("No plan generated. DiskBalancing not needed for node: {} " + if (cmd.hasOption(DiskBalancerCLI.VERBOSE) && plans.size() > 0) {
"threshold used: {}", cmd.getOptionValue(DiskBalancerCLI.PLAN), printToScreen(plans);
this.thresholdPercentage); }
} catch (Exception e) {
final String errMsg =
"Errors while recording the output of plan command.";
LOG.error(errMsg, e);
result.appendln(errMsg);
result.appendln(Throwables.getStackTraceAsString(e));
} }
if (cmd.hasOption(DiskBalancerCLI.VERBOSE) && plans.size() > 0) { getPrintStream().println(result.toString());
printToScreen(plans);
}
} }

View File

@ -47,11 +47,12 @@
*/ */
public class ReportCommand extends Command { public class ReportCommand extends Command {
private PrintStream out; public ReportCommand(Configuration conf) {
this(conf, System.out);
}
public ReportCommand(Configuration conf, final PrintStream out) { public ReportCommand(Configuration conf, final PrintStream ps) {
super(conf); super(conf, ps);
this.out = out;
addValidCommandParameters(DiskBalancerCLI.REPORT, addValidCommandParameters(DiskBalancerCLI.REPORT,
"Report volume information of nodes."); "Report volume information of nodes.");
@ -95,7 +96,7 @@ public void execute(CommandLine cmd) throws Exception {
handleTopReport(cmd, result, nodeFormat); handleTopReport(cmd, result, nodeFormat);
} }
out.println(result.toString()); getPrintStream().println(result.toString());
} }
private void handleTopReport(final CommandLine cmd, final StrBuilder result, private void handleTopReport(final CommandLine cmd, final StrBuilder result,

View File

@ -137,6 +137,8 @@ public class DiskBalancerCLI extends Configured implements Tool {
private final PrintStream printStream; private final PrintStream printStream;
private Command currentCommand = null;
/** /**
* Construct a DiskBalancer. * Construct a DiskBalancer.
* *
@ -431,6 +433,13 @@ private CommandLine parseArgs(String[] argv, Options opts)
return parser.parse(opts, argv); return parser.parse(opts, argv);
} }
/**
* Gets current command associated with this instance of DiskBalancer.
*/
public Command getCurrentCommand() {
return currentCommand;
}
/** /**
* Dispatches calls to the right command Handler classes. * Dispatches calls to the right command Handler classes.
* *
@ -440,38 +449,38 @@ private CommandLine parseArgs(String[] argv, Options opts)
*/ */
private int dispatch(CommandLine cmd, Options opts) private int dispatch(CommandLine cmd, Options opts)
throws Exception { throws Exception {
Command currentCommand = null; Command dbCmd = null;
if (cmd.hasOption(DiskBalancerCLI.PLAN)) { if (cmd.hasOption(DiskBalancerCLI.PLAN)) {
currentCommand = new PlanCommand(getConf()); dbCmd = new PlanCommand(getConf(), printStream);
} }
if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) { if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) {
currentCommand = new ExecuteCommand(getConf()); dbCmd = new ExecuteCommand(getConf());
} }
if (cmd.hasOption(DiskBalancerCLI.QUERY)) { if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
currentCommand = new QueryCommand(getConf()); dbCmd = new QueryCommand(getConf());
} }
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) { if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
currentCommand = new CancelCommand(getConf()); dbCmd = new CancelCommand(getConf());
} }
if (cmd.hasOption(DiskBalancerCLI.REPORT)) { if (cmd.hasOption(DiskBalancerCLI.REPORT)) {
currentCommand = new ReportCommand(getConf(), this.printStream); dbCmd = new ReportCommand(getConf(), this.printStream);
} }
if (cmd.hasOption(DiskBalancerCLI.HELP)) { if (cmd.hasOption(DiskBalancerCLI.HELP)) {
currentCommand = new HelpCommand(getConf()); dbCmd = new HelpCommand(getConf());
} }
// Invoke main help here. // Invoke main help here.
if (currentCommand == null) { if (dbCmd == null) {
new HelpCommand(getConf()).execute(null); new HelpCommand(getConf()).execute(null);
return 1; return 1;
} }
currentCommand.execute(cmd); dbCmd.execute(cmd);
return 0; return 0;
} }
} }

View File

@ -18,10 +18,20 @@
package org.apache.hadoop.hdfs.server.diskbalancer; package org.apache.hadoop.hdfs.server.diskbalancer;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
@ -29,9 +39,12 @@
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeoutException;
/** /**
* Helper class to create various cluster configrations at run time. * Helper class to create various cluster configrations at run time.
@ -242,6 +255,65 @@ public static int getBlockCount(FsVolumeSpi source) throws IOException {
return count; return count;
} }
public static MiniDFSCluster newImbalancedCluster(
final Configuration conf,
final int numDatanodes,
final long[] storageCapacities,
final int defaultBlockSize,
final int fileLen)
throws IOException, InterruptedException, TimeoutException {
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
final String fileName = "/" + UUID.randomUUID().toString();
final Path filePath = new Path(fileName);
Preconditions.checkNotNull(storageCapacities);
Preconditions.checkArgument(
storageCapacities.length == 2,
"need to specify capacities for two storages.");
// Write a file and restart the cluster
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.storageCapacities(storageCapacities)
.storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK})
.storagesPerDatanode(2)
.build();
FsVolumeImpl source = null;
FsVolumeImpl dest = null;
cluster.waitActive();
Random r = new Random();
FileSystem fs = cluster.getFileSystem(0);
TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, 0);
DFSTestUtil.waitReplication(fs, filePath, (short) 1);
cluster.restartDataNodes();
cluster.waitActive();
// Get the data node and move all data to one disk.
for (int i = 0; i < numDatanodes; i++) {
DataNode dnNode = cluster.getDataNodes().get(i);
try (FsDatasetSpi.FsVolumeReferences refs =
dnNode.getFSDataset().getFsVolumeReferences()) {
source = (FsVolumeImpl) refs.get(0);
dest = (FsVolumeImpl) refs.get(1);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0);
DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(),
source, dest);
assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0);
}
}
cluster.restartDataNodes();
cluster.waitActive();
return cluster;
}
/** /**
* Moves all blocks to the destination volume. * Moves all blocks to the destination volume.
* *

View File

@ -244,7 +244,9 @@ public void testDiskBalancerWhenRemovingVolumes() throws Exception {
} catch (Exception e) { } catch (Exception e) {
Assert.fail("Unexpected exception: " + e); Assert.fail("Unexpected exception: " + e);
} finally { } finally {
cluster.shutdown(); if (cluster != null) {
cluster.shutdown();
}
} }
} }

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
@ -409,14 +410,53 @@ public void testHelpCommand() throws Exception {
runCommand(cmdLine); runCommand(cmdLine);
} }
private List<String> runCommandInternal(final String cmdLine) throws @Test
Exception { public void testPrintFullPathOfPlan()
throws Exception {
MiniDFSCluster miniCluster = null;
try {
Configuration hdfsConf = new HdfsConfiguration();
final int numDatanodes = 1;
final int defaultBlockSize = 1024;
final int fileLen = 200 * 1024;
final long capcacity = 300 * 1024;
final long[] capacities = new long[] {capcacity, capcacity};
List<String> outputs = null;
/* new cluster with imbalanced capacity */
miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
hdfsConf,
numDatanodes,
capacities,
defaultBlockSize,
fileLen);
/* run plan command */
final String cmdLine = String.format(
"hdfs diskbalancer -%s %s",
PLAN,
miniCluster.getDataNodes().get(0).getDatanodeUuid());
outputs = runCommand(cmdLine, hdfsConf, miniCluster);
/* verify the path of plan */
assertThat(outputs.get(0), containsString("Writing plan to"));
assertThat(outputs.get(0), containsString("/system/diskbalancer"));
} finally {
if (miniCluster != null) {
miniCluster.shutdown();
}
}
}
private List<String> runCommandInternal(
final String cmdLine,
final Configuration clusterConf) throws Exception {
String[] cmds = StringUtils.split(cmdLine, ' '); String[] cmds = StringUtils.split(cmdLine, ' ');
ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bufOut); PrintStream out = new PrintStream(bufOut);
Tool diskBalancerTool = new DiskBalancerCLI(conf, out); Tool diskBalancerTool = new DiskBalancerCLI(clusterConf, out);
ToolRunner.run(conf, diskBalancerTool, cmds); ToolRunner.run(clusterConf, diskBalancerTool, cmds);
Scanner scanner = new Scanner(bufOut.toString()); Scanner scanner = new Scanner(bufOut.toString());
List<String> outputs = Lists.newArrayList(); List<String> outputs = Lists.newArrayList();
@ -426,6 +466,11 @@ private List<String> runCommandInternal(final String cmdLine) throws
return outputs; return outputs;
} }
private List<String> runCommandInternal(final String cmdLine)
throws Exception {
return runCommandInternal(cmdLine, conf);
}
private List<String> runCommand(final String cmdLine) throws Exception { private List<String> runCommand(final String cmdLine) throws Exception {
FileSystem.setDefaultUri(conf, clusterJson); FileSystem.setDefaultUri(conf, clusterJson);
return runCommandInternal(cmdLine); return runCommandInternal(cmdLine);
@ -437,6 +482,14 @@ private List<String> runCommand(final String cmdLine,
return runCommandInternal(cmdLine); return runCommandInternal(cmdLine);
} }
private List<String> runCommand(
final String cmdLine,
Configuration clusterConf,
MiniDFSCluster miniCluster) throws Exception {
FileSystem.setDefaultUri(clusterConf, miniCluster.getURI());
return runCommandInternal(cmdLine, clusterConf);
}
/** /**
* Making sure that we can query the node without having done a submit. * Making sure that we can query the node without having done a submit.
* @throws Exception * @throws Exception