From 9f32364d283dec47dd07490e253d477a0d14ac71 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 26 Oct 2016 19:39:02 -0700 Subject: [PATCH] HDFS-11038. DiskBalancer: support running multiple commands in single test. Contributed by Xiaobing Zhou. --- .../server/diskbalancer/command/Command.java | 19 ++- .../diskbalancer/command/PlanCommand.java | 15 ++- .../hadoop/hdfs/tools/DiskBalancerCLI.java | 57 ++++---- .../diskbalancer/DiskBalancerTestUtil.java | 2 +- .../command/TestDiskBalancerCommand.java | 123 ++++++++++++++---- 5 files changed, 162 insertions(+), 54 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java index 11c8e7f2d6..b4f95bb4b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java @@ -52,6 +52,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.io.PrintStream; import java.net.InetSocketAddress; @@ -74,7 +75,7 @@ /** * Common interface for command handling. */ -public abstract class Command extends Configured { +public abstract class Command extends Configured implements Closeable { private static final ObjectReader READER = new ObjectMapper().reader(HashMap.class); static final Logger LOG = LoggerFactory.getLogger(Command.class); @@ -106,6 +107,22 @@ public Command(Configuration conf, final PrintStream ps) { this.ps = ps; } + /** + * Cleans any resources held by this command. + *

+ * The main goal is to delete id file created in + * {@link org.apache.hadoop.hdfs.server.balancer + * .NameNodeConnector#checkAndMarkRunning} + * , otherwise, it's not allowed to run multiple commands in a row. + *

+ */ + @Override + public void close() throws IOException { + if (fs != null) { + fs.close(); + } + } + /** * Gets printing stream. * @return print stream diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java index 1d07a63d03..6e45b9672a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java @@ -26,6 +26,7 @@ import org.apache.commons.lang.text.StrBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel .DiskBalancerDataNode; @@ -147,11 +148,17 @@ public void execute(CommandLine cmd) throws Exception { try { if (plan != null && plan.getVolumeSetPlans().size() > 0) { - outputLine = String.format("Writing plan to: %s", getOutputPath()); + outputLine = String.format("Writing plan to:"); recordOutput(result, outputLine); - try (FSDataOutputStream planStream = create(String.format( + + final String planFileName = String.format( DiskBalancerCLI.PLAN_TEMPLATE, - cmd.getOptionValue(DiskBalancerCLI.PLAN)))) { + cmd.getOptionValue(DiskBalancerCLI.PLAN)); + final String planFileFullName = + new Path(getOutputPath(), planFileName).toString(); + recordOutput(result, planFileFullName); + + try (FSDataOutputStream planStream = create(planFileName)) { planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8)); } } else { @@ -173,7 +180,7 @@ public void execute(CommandLine cmd) throws Exception { result.appendln(Throwables.getStackTraceAsString(e)); } - getPrintStream().println(result.toString()); + getPrintStream().print(result.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java index 5bcf939c86..9fdc838bf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java @@ -450,37 +450,44 @@ public Command getCurrentCommand() { private int dispatch(CommandLine cmd, Options opts) throws Exception { Command dbCmd = null; - if (cmd.hasOption(DiskBalancerCLI.PLAN)) { - dbCmd = new PlanCommand(getConf(), printStream); - } + try { + if (cmd.hasOption(DiskBalancerCLI.PLAN)) { + dbCmd = new PlanCommand(getConf(), printStream); + } - if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) { - dbCmd = new ExecuteCommand(getConf()); - } + if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) { + dbCmd = new ExecuteCommand(getConf()); + } - if (cmd.hasOption(DiskBalancerCLI.QUERY)) { - dbCmd = new QueryCommand(getConf()); - } + if (cmd.hasOption(DiskBalancerCLI.QUERY)) { + dbCmd = new QueryCommand(getConf()); + } - if (cmd.hasOption(DiskBalancerCLI.CANCEL)) { - dbCmd = new CancelCommand(getConf()); - } + if (cmd.hasOption(DiskBalancerCLI.CANCEL)) { + dbCmd = new CancelCommand(getConf()); + } - if (cmd.hasOption(DiskBalancerCLI.REPORT)) { - dbCmd = new ReportCommand(getConf(), this.printStream); - } + if (cmd.hasOption(DiskBalancerCLI.REPORT)) { + dbCmd = new ReportCommand(getConf(), this.printStream); + } - if (cmd.hasOption(DiskBalancerCLI.HELP)) { - dbCmd = new HelpCommand(getConf()); - } + if (cmd.hasOption(DiskBalancerCLI.HELP)) { + dbCmd = new HelpCommand(getConf()); + } - // Invoke main help here. - if (dbCmd == null) { - new HelpCommand(getConf()).execute(null); - return 1; - } + // Invoke main help here. + if (dbCmd == null) { + dbCmd = new HelpCommand(getConf()); + dbCmd.execute(null); + return 1; + } - dbCmd.execute(cmd); - return 0; + dbCmd.execute(cmd); + return 0; + } finally { + if (dbCmd != null) { + dbCmd.close(); + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java index a575097595..c60fe2104a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java @@ -47,7 +47,7 @@ import java.util.concurrent.TimeoutException; /** - * Helper class to create various cluster configrations at run time. + * Helper class to create various cluster configurations at run time. */ public class DiskBalancerTestUtil { public static final long MB = 1024 * 1024L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java index 9f9c7b8c1b..0f65f256de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java @@ -18,6 +18,14 @@ package org.apache.hadoop.hdfs.server.diskbalancer.command; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.NODE; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.OUTFILE; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY; +import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT; import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; @@ -34,6 +42,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -44,34 +53,34 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.tools.DiskBalancerCLI; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import com.google.common.collect.Lists; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.NODE; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY; -import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT; - -import org.junit.Rule; -import org.junit.rules.ExpectedException; - /** * Tests various CLI commands of DiskBalancer. */ public class TestDiskBalancerCommand { + @Rule public ExpectedException thrown = ExpectedException.none(); private MiniDFSCluster cluster; private URI clusterJson; private Configuration conf = new HdfsConfiguration(); + private final static int DEFAULT_BLOCK_SIZE = 1024; + private final static int FILE_LEN = 200 * 1024; + private final static long CAPCACITY = 300 * 1024; + private final static long[] CAPACITIES = new long[] {CAPCACITY, CAPCACITY}; + @Before public void setUp() throws Exception { conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); @@ -87,11 +96,69 @@ public void setUp() throws Exception { public void tearDown() throws Exception { if (cluster != null) { // Just make sure we can shutdown datanodes. - cluster.getDataNodes().get(0).shutdown(); + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + cluster.getDataNodes().get(i).shutdown(); + } cluster.shutdown(); } } + /** + * Tests running multiple commands under on setup. This mainly covers + * {@link org.apache.hadoop.hdfs.server.diskbalancer.command.Command#close} + */ + @Test(timeout = 60000) + public void testRunMultipleCommandsUnderOneSetup() throws Exception { + + final int numDatanodes = 1; + MiniDFSCluster miniCluster = null; + final Configuration hdfsConf = new HdfsConfiguration(); + + try { + /* new cluster with imbalanced capacity */ + miniCluster = DiskBalancerTestUtil.newImbalancedCluster( + hdfsConf, + numDatanodes, + CAPACITIES, + DEFAULT_BLOCK_SIZE, + FILE_LEN); + String cmdLine = ""; + List outputs = null; + final DataNode dn = miniCluster.getDataNodes().get(0); + + /* run plan command */ + cmdLine = String.format( + "hdfs diskbalancer -%s %s", + PLAN, + dn.getDatanodeUuid()); + outputs = runCommand(cmdLine, hdfsConf, miniCluster); + + /* get path of plan file*/ + final String planFileName = dn.getDatanodeUuid(); + + /* verify plan command */ + assertEquals( + "There must be two lines: the 1st is writing plan to...," + + " the 2nd is actual full path of plan file.", + 2, outputs.size()); + assertThat(outputs.get(1), containsString(planFileName)); + + /* get full path of plan file*/ + final String planFileFullName = outputs.get(1); + + /* run execute command */ + cmdLine = String.format( + "hdfs diskbalancer -%s %s", + EXECUTE, + planFileFullName); + outputs = runCommand(cmdLine, hdfsConf, miniCluster); + } finally { + if (miniCluster != null) { + miniCluster.shutdown(); + } + } + } + /* test basic report */ @Test(timeout = 60000) public void testReportSimple() throws Exception { @@ -413,34 +480,44 @@ public void testHelpCommand() throws Exception { @Test public void testPrintFullPathOfPlan() throws Exception { + final Path parent = new Path( + PathUtils.getTestPath(getClass()), + GenericTestUtils.getMethodName()); + MiniDFSCluster miniCluster = null; try { Configuration hdfsConf = new HdfsConfiguration(); - final int numDatanodes = 1; - final int defaultBlockSize = 1024; - final int fileLen = 200 * 1024; - final long capcacity = 300 * 1024; - final long[] capacities = new long[] {capcacity, capcacity}; List outputs = null; /* new cluster with imbalanced capacity */ miniCluster = DiskBalancerTestUtil.newImbalancedCluster( hdfsConf, - numDatanodes, - capacities, - defaultBlockSize, - fileLen); + 1, + CAPACITIES, + DEFAULT_BLOCK_SIZE, + FILE_LEN); /* run plan command */ final String cmdLine = String.format( - "hdfs diskbalancer -%s %s", + "hdfs diskbalancer -%s %s -%s %s", PLAN, - miniCluster.getDataNodes().get(0).getDatanodeUuid()); + miniCluster.getDataNodes().get(0).getDatanodeUuid(), + OUTFILE, + parent); outputs = runCommand(cmdLine, hdfsConf, miniCluster); + /* get full path */ + final String planFileFullName = new Path( + parent, + miniCluster.getDataNodes().get(0).getDatanodeUuid()).toString(); + /* verify the path of plan */ + assertEquals( + "There must be two lines: the 1st is writing plan to," + + " the 2nd is actual full path of plan file.", + 2, outputs.size()); assertThat(outputs.get(0), containsString("Writing plan to")); - assertThat(outputs.get(0), containsString("/system/diskbalancer")); + assertThat(outputs.get(1), containsString(planFileFullName)); } finally { if (miniCluster != null) { miniCluster.shutdown();