From 690d8a368d3e967495eafea27659b6124989c89e Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Wed, 30 Mar 2016 11:42:54 +0900 Subject: [PATCH] MAPREDUCE-6663. [NNBench] Refactor nnbench as a Tool implementation. Contributed by Brahma Reddy Battula. --- .../java/org/apache/hadoop/hdfs/NNBench.java | 237 ++++++++++-------- .../org/apache/hadoop/hdfs/TestNNBench.java | 84 +++++++ 2 files changed, 210 insertions(+), 111 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java index 96c4710b48..ee3cc00fd1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/NNBench.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; -import java.net.InetAddress; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; @@ -33,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,6 +43,7 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; @@ -54,6 +55,8 @@ import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; /** * This program executes a specified operation that applies load to @@ -74,49 +77,48 @@ * must be run before running the other operations. */ -public class NNBench { +public class NNBench extends Configured implements Tool { private static final Log LOG = LogFactory.getLog( "org.apache.hadoop.hdfs.NNBench"); - protected static String CONTROL_DIR_NAME = "control"; - protected static String OUTPUT_DIR_NAME = "output"; - protected static String DATA_DIR_NAME = "data"; - protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; - protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; - - public static String operation = "none"; - public static long numberOfMaps = 1l; // default is 1 - public static long numberOfReduces = 1l; // default is 1 - public static long startTime = + private static String CONTROL_DIR_NAME = "control"; + private static String OUTPUT_DIR_NAME = "output"; + private static String DATA_DIR_NAME = "data"; + static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; + private static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; + + private String operation = "none"; + private long numberOfMaps = 1l; // default is 1 + private long numberOfReduces = 1l; // default is 1 + private long startTime = System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min - public static long blockSize = 1l; // default is 1 - public static int bytesToWrite = 0; // default is 0 - public static long bytesPerChecksum = 1l; // default is 1 - public static long numberOfFiles = 1l; // default is 1 - public static short replicationFactorPerFile = 1; // default is 1 - public static String baseDir = "/benchmarks/NNBench"; // default - public static boolean readFileAfterOpen = false; // default is to not read - + private long blockSize = 1l; // default is 1 + private int bytesToWrite = 0; // default is 0 + private long bytesPerChecksum = 1l; // default is 1 + private long numberOfFiles = 1l; // default is 1 + private short replicationFactorPerFile = 1; // default is 1 + private String baseDir = "/benchmarks/NNBench"; // default + private boolean readFileAfterOpen = false; // default is to not read + private boolean isHelpMessage = false; // Supported operations private static final String OP_CREATE_WRITE = "create_write"; private static final String OP_OPEN_READ = "open_read"; private static final String OP_RENAME = "rename"; private static final String OP_DELETE = "delete"; + private static final int MAX_OPERATION_EXCEPTIONS = 1000; // To display in the format that matches the NN and DN log format // Example: 2007-10-26 00:01:19,853 static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S"); - - private static Configuration config = new Configuration(); /** * Clean up the files before a test run * * @throws IOException on error */ - private static void cleanupBeforeTestrun() throws IOException { - FileSystem tempFS = FileSystem.get(config); + private void cleanupBeforeTestrun() throws IOException { + FileSystem tempFS = FileSystem.get(getConf()); // Delete the data directory only if it is the create/write operation if (operation.equals(OP_CREATE_WRITE)) { @@ -133,8 +135,7 @@ private static void cleanupBeforeTestrun() throws IOException { * * @throws IOException on error */ - private static void createControlFiles() throws IOException { - FileSystem tempFS = FileSystem.get(config); + private void createControlFiles() throws IOException { LOG.info("Creating " + numberOfMaps + " control files"); for (int i = 0; i < numberOfMaps; i++) { @@ -144,8 +145,9 @@ private static void createControlFiles() throws IOException { SequenceFile.Writer writer = null; try { - writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, - LongWritable.class, CompressionType.NONE); + writer = SequenceFile.createWriter(getConf(), Writer.file(filePath), + Writer.keyClass(Text.class), Writer.valueClass(LongWritable.class), + Writer.compression(CompressionType.NONE)); writer.append(new Text(strFileName), new LongWritable(i)); } finally { if (writer != null) { @@ -208,23 +210,23 @@ private static void displayUsage() { * line's arguments * @param length total number of arguments */ - public static void checkArgs(final int index, final int length) { + private static void checkArgs(final int index, final int length) { if (index == length) { displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException("Not enough arguments"); } } /** * Parse input arguments + * @param args array of command line's parameters to be parsed * - * @param args array of command line's parameters to be parsed */ - public static void parseInputs(final String[] args) { + private void parseInputs(final String[] args) { // If there are no command line arguments, exit if (args.length == 0) { displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException("Give valid inputs"); } // Parse command line args @@ -263,7 +265,7 @@ public static void parseInputs(final String[] args) { readFileAfterOpen = Boolean.parseBoolean(args[++i]); } else if (args[i].equals("-help")) { displayUsage(); - System.exit(-1); + isHelpMessage = true; } } @@ -281,31 +283,30 @@ public static void parseInputs(final String[] args) { LOG.info(" Read file after open: " + readFileAfterOpen); // Set user-defined parameters, so the map method can access the values - config.set("test.nnbench.operation", operation); - config.setLong("test.nnbench.maps", numberOfMaps); - config.setLong("test.nnbench.reduces", numberOfReduces); - config.setLong("test.nnbench.starttime", startTime); - config.setLong("test.nnbench.blocksize", blockSize); - config.setInt("test.nnbench.bytestowrite", bytesToWrite); - config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum); - config.setLong("test.nnbench.numberoffiles", numberOfFiles); - config.setInt("test.nnbench.replicationfactor", + getConf().set("test.nnbench.operation", operation); + getConf().setLong("test.nnbench.maps", numberOfMaps); + getConf().setLong("test.nnbench.reduces", numberOfReduces); + getConf().setLong("test.nnbench.starttime", startTime); + getConf().setLong("test.nnbench.blocksize", blockSize); + getConf().setInt("test.nnbench.bytestowrite", bytesToWrite); + getConf().setLong("test.nnbench.bytesperchecksum", bytesPerChecksum); + getConf().setLong("test.nnbench.numberoffiles", numberOfFiles); + getConf().setInt("test.nnbench.replicationfactor", (int) replicationFactorPerFile); - config.set("test.nnbench.basedir", baseDir); - config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen); + getConf().set("test.nnbench.basedir", baseDir); + getConf().setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen); - config.set("test.nnbench.datadir.name", DATA_DIR_NAME); - config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME); - config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME); + getConf().set("test.nnbench.datadir.name", DATA_DIR_NAME); + getConf().set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME); + getConf().set("test.nnbench.controldir.name", CONTROL_DIR_NAME); } /** * Analyze the results - * * @throws IOException on error */ - private static void analyzeResults() throws IOException { - final FileSystem fs = FileSystem.get(config); + private int analyzeResults() throws IOException { + final FileSystem fs = FileSystem.get(getConf()); Path reduceDir = new Path(baseDir, OUTPUT_DIR_NAME); long totalTimeAL1 = 0l; @@ -322,32 +323,31 @@ private static void analyzeResults() throws IOException { for (FileStatus status : fss) { Path reduceFile = status.getPath(); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); + try (DataInputStream in = new DataInputStream(fs.open(reduceFile)); + BufferedReader lines = + new BufferedReader(new InputStreamReader(in))) { - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); - - String line; - while ((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); - String attr = tokens.nextToken(); - if (attr.endsWith(":totalTimeAL1")) { - totalTimeAL1 = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":totalTimeAL2")) { - totalTimeAL2 = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":totalTimeTPmS")) { - totalTimeTPmS = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":latemaps")) { - lateMaps = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":numOfExceptions")) { - numOfExceptions = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":successfulFileOps")) { - successfulFileOps = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":mapStartTimeTPmS")) { - mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); - } else if (attr.endsWith(":mapEndTimeTPmS")) { - mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); + String line; + while ((line = lines.readLine()) != null) { + StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%;"); + String attr = tokens.nextToken(); + if (attr.endsWith(":totalTimeAL1")) { + totalTimeAL1 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeAL2")) { + totalTimeAL2 = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":totalTimeTPmS")) { + totalTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":latemaps")) { + lateMaps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":numOfExceptions")) { + numOfExceptions = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":successfulFileOps")) { + successfulFileOps = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapStartTimeTPmS")) { + mapStartTimeTPmS = Long.parseLong(tokens.nextToken()); + } else if (attr.endsWith(":mapEndTimeTPmS")) { + mapEndTimeTPmS = Long.parseLong(tokens.nextToken()); + } } } } @@ -444,25 +444,29 @@ private static void analyzeResults() throws IOException { " RAW DATA: # of exceptions: " + numOfExceptions, "" }; - PrintStream res = new PrintStream(new FileOutputStream( - new File(DEFAULT_RES_FILE_NAME), true)); - - // Write to a file and also dump to log - for(int i = 0; i < resultLines.length; i++) { - LOG.info(resultLines[i]); - res.println(resultLines[i]); + try (PrintStream res = new PrintStream( + new FileOutputStream(new File(DEFAULT_RES_FILE_NAME), true))) { + // Write to a file and also dump to log + for (String resultLine : resultLines) { + LOG.info(resultLine); + res.println(resultLine); + } } + if(numOfExceptions >= MAX_OPERATION_EXCEPTIONS){ + return -1; + } + return 0; } - + /** * Run the test * * @throws IOException on error */ - public static void runTests() throws IOException { - config.setLong("io.bytes.per.checksum", bytesPerChecksum); + private void runTests() throws IOException { + getConf().setLong("io.bytes.per.checksum", bytesPerChecksum); - JobConf job = new JobConf(config, NNBench.class); + JobConf job = new JobConf(getConf(), NNBench.class); job.setJobName("NNBench-" + operation); FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME)); @@ -487,7 +491,7 @@ public static void runTests() throws IOException { /** * Validate the inputs */ - public static void validateInputs() { + private void validateInputs() { // If it is not one of the four operations, then fail if (!operation.equals(OP_CREATE_WRITE) && !operation.equals(OP_OPEN_READ) && @@ -495,7 +499,8 @@ public static void validateInputs() { !operation.equals(OP_DELETE)) { System.err.println("Error: Unknown operation: " + operation); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Unknown operation: " + operation); } // If number of maps is a negative number, then fail @@ -503,57 +508,66 @@ public static void validateInputs() { if (numberOfMaps < 0) { System.err.println("Error: Number of maps must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Number of maps must be a positive number"); } // If number of reduces is a negative number or 0, then fail if (numberOfReduces <= 0) { System.err.println("Error: Number of reduces must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Number of reduces must be a positive number"); } // If blocksize is a negative number or 0, then fail if (blockSize <= 0) { System.err.println("Error: Block size must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Block size must be a positive number"); } // If bytes to write is a negative number, then fail if (bytesToWrite < 0) { System.err.println("Error: Bytes to write must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Bytes to write must be a positive number"); } // If bytes per checksum is a negative number, then fail if (bytesPerChecksum < 0) { System.err.println("Error: Bytes per checksum must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Bytes per checksum must be a positive number"); } // If number of files is a negative number, then fail if (numberOfFiles < 0) { System.err.println("Error: Number of files must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Number of files must be a positive number"); } // If replication factor is a negative number, then fail if (replicationFactorPerFile < 0) { System.err.println("Error: Replication factor must be a positive number"); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Replication factor must be a positive number"); } // If block size is not a multiple of bytesperchecksum, fail if (blockSize % bytesPerChecksum != 0) { - System.err.println("Error: Block Size in bytes must be a multiple of " + - "bytes per checksum: "); + System.err.println("Error: Block Size in bytes must be a multiple of " + + "bytes per checksum: "); displayUsage(); - System.exit(-1); + throw new HadoopIllegalArgumentException( + "Error: Block Size in bytes must be a multiple of " + + "bytes per checksum:"); } } /** @@ -562,13 +576,22 @@ public static void validateInputs() { * @param args array of command line arguments * @throws IOException indicates a problem with test startup */ - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new NNBench(), args); + System.exit(res); + } + + @Override + public int run(String[] args) throws Exception { // Display the application version string displayVersion(); // Parse the inputs parseInputs(args); - + if (isHelpMessage) { + return 0; + } + // Validate inputs validateInputs(); @@ -582,7 +605,7 @@ public static void main(String[] args) throws IOException { runTests(); // Analyze results - analyzeResults(); + return analyzeResults(); } @@ -592,7 +615,6 @@ public static void main(String[] args) throws IOException { static class NNBenchMapper extends Configured implements Mapper { FileSystem filesystem = null; - private String hostName = null; long numberOfFiles = 1l; long blkSize = 1l; @@ -602,7 +624,6 @@ static class NNBenchMapper extends Configured String dataDirName = null; String op = null; boolean readFile = false; - final int MAX_OPERATION_EXCEPTIONS = 1000; // Data to collect from the operation int numOfExceptions = 0; @@ -628,12 +649,6 @@ public void configure(JobConf conf) { } catch(Exception e) { throw new RuntimeException("Cannot get file system.", e); } - - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch(Exception e) { - throw new RuntimeException("Error getting hostname", e); - } } /** @@ -678,7 +693,7 @@ public void map(Text key, LongWritable value, OutputCollector output, Reporter reporter) throws IOException { - Configuration conf = filesystem.getConf(); + Configuration conf = getConf(); numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l); blkSize = conf.getLong("test.nnbench.blocksize", 1l); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java new file mode 100644 index 0000000000..9f9814d722 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/hdfs/TestNNBench.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.HadoopTestCase; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.ToolRunner; +import org.junit.After; +import org.junit.Test; + +public class TestNNBench extends HadoopTestCase { + private static final String BASE_DIR = + new File(System.getProperty("test.build.data", "build/test/data"), + "NNBench").getAbsolutePath(); + + public TestNNBench() throws IOException { + super(LOCAL_MR, LOCAL_FS, 1, 1); + } + + @After + public void tearDown() throws Exception { + getFileSystem().delete(new Path(BASE_DIR), true); + getFileSystem().delete(new Path(NNBench.DEFAULT_RES_FILE_NAME), true); + super.tearDown(); + } + + @Test(timeout = 30000) + public void testNNBenchCreateReadAndDelete() throws Exception { + runNNBench(createJobConf(), "create_write"); + Path path = new Path(BASE_DIR + "/data/file_0_0"); + assertTrue("create_write should create the file", + getFileSystem().exists(path)); + runNNBench(createJobConf(), "open_read"); + runNNBench(createJobConf(), "delete"); + assertFalse("Delete operation should delete the file", + getFileSystem().exists(path)); + } + + @Test(timeout = 30000) + public void testNNBenchCreateAndRename() throws Exception { + runNNBench(createJobConf(), "create_write"); + Path path = new Path(BASE_DIR + "/data/file_0_0"); + assertTrue("create_write should create the file", + getFileSystem().exists(path)); + runNNBench(createJobConf(), "rename"); + Path renamedPath = new Path(BASE_DIR + "/data/file_0_r_0"); + assertFalse("Rename should rename the file", getFileSystem().exists(path)); + assertTrue("Rename should rename the file", + getFileSystem().exists(renamedPath)); + } + + private void runNNBench(Configuration conf, String operation) + throws Exception { + String[] genArgs = { "-operation", operation, "-baseDir", BASE_DIR, + "-startTime", "" + (Time.now() / 1000 + 3) }; + + assertEquals(0, ToolRunner.run(conf, new NNBench(), genArgs)); + } + +}