From bd524bd9608fa6387623cfe492572722c0477470 Mon Sep 17 00:00:00 2001 From: Giridharan Kesavan Date: Wed, 27 May 2009 14:48:04 +0000 Subject: [PATCH] move test and xml files git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-4687/core@779196 13f79535-47bb-0310-9956-ffa450edef68 --- src/test/hdfs-site.xml | 9 - .../apache/hadoop/fs/AccumulatingReducer.java | 103 -- .../org/apache/hadoop/fs/DFSCIOTest.java | 551 ---------- .../apache/hadoop/fs/DistributedFSCheck.java | 353 ------- .../org/apache/hadoop/fs/IOMapperBase.java | 129 --- .../org/apache/hadoop/fs/TestCopyFiles.java | 853 ---------------- .../org/apache/hadoop/fs/TestDFSIO.java | 445 -------- .../org/apache/hadoop/fs/TestFileSystem.java | 629 ------------ .../apache/hadoop/fs/TestHarFileSystem.java | 213 ---- .../org/apache/hadoop/hdfs/NNBench.java | 964 ------------------ .../apache/hadoop/hdfs/NNBenchWithoutMR.java | 344 ------- .../org/apache/hadoop/io/FileBench.java | 603 ----------- .../io/TestSequenceFileMergeProgress.java | 98 -- .../apache/hadoop/ipc/TestSocketFactory.java | 197 ---- .../TestServiceLevelAuthorization.java | 152 --- .../org/apache/hadoop/test/AllTestDriver.java | 46 - .../hadoop/test/HdfsWithMRTestDriver.java | 75 -- .../org/apache/hadoop/tools/TestDistCh.java | 221 ---- src/test/mapred-site.xml | 18 - src/webapps/datanode/browseBlock.jsp | 404 -------- src/webapps/datanode/browseDirectory.jsp | 192 ---- src/webapps/datanode/tail.jsp | 135 --- src/webapps/hdfs/dfshealth.jsp | 280 ----- src/webapps/hdfs/dfsnodelist.jsp | 276 ----- src/webapps/hdfs/index.html | 35 - src/webapps/hdfs/nn_browsedfscontent.jsp | 77 -- src/webapps/job/analysejobhistory.jsp | 269 ----- src/webapps/job/index.html | 35 - src/webapps/job/jobblacklistedtrackers.jsp | 80 -- src/webapps/job/jobconf.jsp | 71 -- src/webapps/job/jobconf_history.jsp | 75 -- src/webapps/job/jobdetails.jsp | 400 -------- src/webapps/job/jobdetailshistory.jsp | 280 ----- src/webapps/job/jobfailures.jsp | 187 ---- src/webapps/job/jobhistory.jsp | 324 ------ src/webapps/job/jobqueue_details.jsp | 89 -- src/webapps/job/jobtasks.jsp | 154 --- src/webapps/job/jobtaskshistory.jsp | 88 -- src/webapps/job/jobtracker.jsp | 173 ---- src/webapps/job/loadhistory.jsp | 68 -- src/webapps/job/machines.jsp | 138 --- src/webapps/job/taskdetails.jsp | 292 ------ src/webapps/job/taskdetailshistory.jsp | 125 --- src/webapps/job/taskstats.jsp | 106 -- src/webapps/secondary/index.html | 29 - src/webapps/secondary/status.jsp | 39 - src/webapps/static/hadoop-logo.jpg | Bin 9443 -> 0 bytes src/webapps/static/hadoop.css | 134 --- src/webapps/static/jobconf.xsl | 18 - src/webapps/static/jobtracker.js | 151 --- src/webapps/task/index.html | 17 - src/webapps/task/tasktracker.jsp | 108 -- 52 files changed, 10852 deletions(-) delete mode 100644 src/test/hdfs-site.xml delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBenchWithoutMR.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/io/FileBench.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/io/TestSequenceFileMergeProgress.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/ipc/TestSocketFactory.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/test/AllTestDriver.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/test/HdfsWithMRTestDriver.java delete mode 100644 src/test/hdfs-with-mr/org/apache/hadoop/tools/TestDistCh.java delete mode 100644 src/test/mapred-site.xml delete mode 100644 src/webapps/datanode/browseBlock.jsp delete mode 100644 src/webapps/datanode/browseDirectory.jsp delete mode 100644 src/webapps/datanode/tail.jsp delete mode 100644 src/webapps/hdfs/dfshealth.jsp delete mode 100644 src/webapps/hdfs/dfsnodelist.jsp delete mode 100644 src/webapps/hdfs/index.html delete mode 100644 src/webapps/hdfs/nn_browsedfscontent.jsp delete mode 100644 src/webapps/job/analysejobhistory.jsp delete mode 100644 src/webapps/job/index.html delete mode 100644 src/webapps/job/jobblacklistedtrackers.jsp delete mode 100644 src/webapps/job/jobconf.jsp delete mode 100644 src/webapps/job/jobconf_history.jsp delete mode 100644 src/webapps/job/jobdetails.jsp delete mode 100644 src/webapps/job/jobdetailshistory.jsp delete mode 100644 src/webapps/job/jobfailures.jsp delete mode 100644 src/webapps/job/jobhistory.jsp delete mode 100644 src/webapps/job/jobqueue_details.jsp delete mode 100644 src/webapps/job/jobtasks.jsp delete mode 100644 src/webapps/job/jobtaskshistory.jsp delete mode 100644 src/webapps/job/jobtracker.jsp delete mode 100644 src/webapps/job/loadhistory.jsp delete mode 100644 src/webapps/job/machines.jsp delete mode 100644 src/webapps/job/taskdetails.jsp delete mode 100644 src/webapps/job/taskdetailshistory.jsp delete mode 100644 src/webapps/job/taskstats.jsp delete mode 100644 src/webapps/secondary/index.html delete mode 100644 src/webapps/secondary/status.jsp delete mode 100644 src/webapps/static/hadoop-logo.jpg delete mode 100644 src/webapps/static/hadoop.css delete mode 100644 src/webapps/static/jobconf.xsl delete mode 100644 src/webapps/static/jobtracker.js delete mode 100644 src/webapps/task/index.html delete mode 100644 src/webapps/task/tasktracker.jsp diff --git a/src/test/hdfs-site.xml b/src/test/hdfs-site.xml deleted file mode 100644 index cbd6ab6eff..0000000000 --- a/src/test/hdfs-site.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java deleted file mode 100644 index 8c00ac8b8c..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * Reducer that accumulates values based on their type. - *

- * The type is specified in the key part of the key-value pair - * as a prefix to the key in the following way - *

- * type:key - *

- * The values are accumulated according to the types: - *

- * - */ -public class AccumulatingReducer extends MapReduceBase - implements Reducer { - static final String VALUE_TYPE_LONG = "l:"; - static final String VALUE_TYPE_FLOAT = "f:"; - static final String VALUE_TYPE_STRING = "s:"; - private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class); - - protected String hostName; - - public AccumulatingReducer () { - LOG.info("Starting AccumulatingReducer !!!"); - try { - hostName = java.net.InetAddress.getLocalHost().getHostName(); - } catch(Exception e) { - hostName = "localhost"; - } - LOG.info("Starting AccumulatingReducer on " + hostName); - } - - public void reduce(Text key, - Iterator values, - OutputCollector output, - Reporter reporter - ) throws IOException { - String field = key.toString(); - - reporter.setStatus("starting " + field + " ::host = " + hostName); - - // concatenate strings - if (field.startsWith(VALUE_TYPE_STRING)) { - String sSum = ""; - while (values.hasNext()) - sSum += values.next().toString() + ";"; - output.collect(key, new Text(sSum)); - reporter.setStatus("finished " + field + " ::host = " + hostName); - return; - } - // sum long values - if (field.startsWith(VALUE_TYPE_FLOAT)) { - float fSum = 0; - while (values.hasNext()) - fSum += Float.parseFloat(values.next().toString()); - output.collect(key, new Text(String.valueOf(fSum))); - reporter.setStatus("finished " + field + " ::host = " + hostName); - return; - } - // sum long values - if (field.startsWith(VALUE_TYPE_LONG)) { - long lSum = 0; - while (values.hasNext()) { - lSum += Long.parseLong(values.next().toString()); - } - output.collect(key, new Text(String.valueOf(lSum))); - } - reporter.setStatus("finished " + field + " ::host = " + hostName); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java deleted file mode 100644 index 7bc3e6a343..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java +++ /dev/null @@ -1,551 +0,0 @@ - /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintStream; -import java.util.Date; -import java.util.StringTokenizer; - -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; - -/** - * Distributed i/o benchmark. - *

- * This test writes into or reads from a specified number of files. - * File size is specified as a parameter to the test. - * Each file is accessed in a separate map task. - *

- * The reducer collects the following statistics: - *

    - *
  • number of tasks completed
  • - *
  • number of bytes written/read
  • - *
  • execution time
  • - *
  • io rate
  • - *
  • io rate squared
  • - *
- * - * Finally, the following information is appended to a local file - *
    - *
  • read or write test
  • - *
  • date and time the test finished
  • - *
  • number of files
  • - *
  • total number of bytes processed
  • - *
  • throughput in mb/sec (total number of bytes / sum of processing times)
  • - *
  • average i/o rate in mb/sec per file
  • - *
  • standard i/o rate deviation
  • - *
- */ -public class DFSCIOTest extends TestCase { - // Constants - private static final Log LOG = LogFactory.getLog(DFSCIOTest.class); - private static final int TEST_TYPE_READ = 0; - private static final int TEST_TYPE_WRITE = 1; - private static final int TEST_TYPE_CLEANUP = 2; - private static final int DEFAULT_BUFFER_SIZE = 1000000; - private static final String BASE_FILE_NAME = "test_io_"; - private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log"; - - private static Configuration fsConfig = new Configuration(); - private static final long MEGA = 0x100000; - private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest"); - private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); - private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); - private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); - private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); - - private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest"); - private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1"); - private static String CHMOD = new String("chmod"); - private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION); - private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read"); - private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write"); - - /** - * Run the test with default parameters. - * - * @throws Exception - */ - public void testIOs() throws Exception { - testIOs(10, 10); - } - - /** - * Run the test with the specified parameters. - * - * @param fileSize file size - * @param nrFiles number of files - * @throws IOException - */ - public static void testIOs(int fileSize, int nrFiles) - throws IOException { - - FileSystem fs = FileSystem.get(fsConfig); - - createControlFile(fs, fileSize, nrFiles); - writeTest(fs); - readTest(fs); - } - - private static void createControlFile( - FileSystem fs, - int fileSize, // in MB - int nrFiles - ) throws IOException { - LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); - - fs.delete(CONTROL_DIR, true); - - for(int i=0; i < nrFiles; i++) { - String name = getFileName(i); - Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter(fs, fsConfig, controlFile, - Text.class, LongWritable.class, - CompressionType.NONE); - writer.append(new Text(name), new LongWritable(fileSize)); - } catch(Exception e) { - throw new IOException(e.getLocalizedMessage()); - } finally { - if (writer != null) - writer.close(); - writer = null; - } - } - LOG.info("created control files for: "+nrFiles+" files"); - } - - private static String getFileName(int fIdx) { - return BASE_FILE_NAME + Integer.toString(fIdx); - } - - /** - * Write/Read mapper base class. - *

- * Collects the following statistics per task: - *

    - *
  • number of tasks completed
  • - *
  • number of bytes written/read
  • - *
  • execution time
  • - *
  • i/o rate
  • - *
  • i/o rate squared
  • - *
- */ - private abstract static class IOStatMapper extends IOMapperBase { - IOStatMapper() { - super(fsConfig); - } - - void collectStats(OutputCollector output, - String name, - long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); - float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); - LOG.info("Number of bytes processed = " + totalSize); - LOG.info("Exec time = " + execTime); - LOG.info("IO rate = " + ioRateMbSec); - - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), - new Text(String.valueOf(1))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), - new Text(String.valueOf(totalSize))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), - new Text(String.valueOf(execTime))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), - new Text(String.valueOf(ioRateMbSec*1000))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), - new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); - } - } - - /** - * Write mapper class. - */ - public static class WriteMapper extends IOStatMapper { - - public WriteMapper() { - super(); - for(int i=0; i < bufferSize; i++) - buffer[i] = (byte)('0' + i % 50); - } - - public Object doIO(Reporter reporter, - String name, - long totalSize - ) throws IOException { - // create file - totalSize *= MEGA; - - // create instance of local filesystem - FileSystem localFS = FileSystem.getLocal(fsConfig); - - try { - // native runtime - Runtime runTime = Runtime.getRuntime(); - - // copy the dso and executable from dfs and chmod them - synchronized (this) { - localFS.delete(HDFS_TEST_DIR, true); - if (!(localFS.mkdirs(HDFS_TEST_DIR))) { - throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); - } - } - - synchronized (this) { - if (!localFS.exists(HDFS_SHLIB)) { - FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig); - - String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); - Process process = runTime.exec(chmodCmd); - int exitStatus = process.waitFor(); - if (exitStatus != 0) { - throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); - } - } - } - - synchronized (this) { - if (!localFS.exists(HDFS_WRITE)) { - FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig); - - String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); - Process process = runTime.exec(chmodCmd); - int exitStatus = process.waitFor(); - if (exitStatus != 0) { - throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); - } - } - } - - // exec the C program - Path outFile = new Path(DATA_DIR, name); - String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); - Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString())); - int exitStatus = process.waitFor(); - if (exitStatus != 0) { - throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus); - } - } catch (InterruptedException interruptedException) { - reporter.setStatus(interruptedException.toString()); - } finally { - localFS.close(); - } - return new Long(totalSize); - } - } - - private static void writeTest(FileSystem fs) - throws IOException { - - fs.delete(DATA_DIR, true); - fs.delete(WRITE_DIR, true); - - runIOTest(WriteMapper.class, WRITE_DIR); - } - - private static void runIOTest( Class mapperClass, - Path outputDir - ) throws IOException { - JobConf job = new JobConf(fsConfig, DFSCIOTest.class); - - FileInputFormat.setInputPaths(job, CONTROL_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(mapperClass); - job.setReducerClass(AccumulatingReducer.class); - - FileOutputFormat.setOutputPath(job, outputDir); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - /** - * Read mapper class. - */ - public static class ReadMapper extends IOStatMapper { - - public ReadMapper() { - super(); - } - - public Object doIO(Reporter reporter, - String name, - long totalSize - ) throws IOException { - totalSize *= MEGA; - - // create instance of local filesystem - FileSystem localFS = FileSystem.getLocal(fsConfig); - - try { - // native runtime - Runtime runTime = Runtime.getRuntime(); - - // copy the dso and executable from dfs - synchronized (this) { - localFS.delete(HDFS_TEST_DIR, true); - if (!(localFS.mkdirs(HDFS_TEST_DIR))) { - throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem"); - } - } - - synchronized (this) { - if (!localFS.exists(HDFS_SHLIB)) { - if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) { - throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem"); - } - - String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB); - Process process = runTime.exec(chmodCmd); - int exitStatus = process.waitFor(); - if (exitStatus != 0) { - throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); - } - } - } - - synchronized (this) { - if (!localFS.exists(HDFS_READ)) { - if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) { - throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem"); - } - - String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); - Process process = runTime.exec(chmodCmd); - int exitStatus = process.waitFor(); - - if (exitStatus != 0) { - throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus); - } - } - } - - // exec the C program - Path inFile = new Path(DATA_DIR, name); - String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + - bufferSize); - Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString())); - int exitStatus = process.waitFor(); - - if (exitStatus != 0) { - throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus); - } - } catch (InterruptedException interruptedException) { - reporter.setStatus(interruptedException.toString()); - } finally { - localFS.close(); - } - return new Long(totalSize); - } - } - - private static void readTest(FileSystem fs) throws IOException { - fs.delete(READ_DIR, true); - runIOTest(ReadMapper.class, READ_DIR); - } - - private static void sequentialTest( - FileSystem fs, - int testType, - int fileSize, - int nrFiles - ) throws Exception { - IOStatMapper ioer = null; - if (testType == TEST_TYPE_READ) - ioer = new ReadMapper(); - else if (testType == TEST_TYPE_WRITE) - ioer = new WriteMapper(); - else - return; - for(int i=0; i < nrFiles; i++) - ioer.doIO(Reporter.NULL, - BASE_FILE_NAME+Integer.toString(i), - MEGA*fileSize); - } - - public static void main(String[] args) { - int testType = TEST_TYPE_READ; - int bufferSize = DEFAULT_BUFFER_SIZE; - int fileSize = 1; - int nrFiles = 1; - String resFileName = DEFAULT_RES_FILE_NAME; - boolean isSequential = false; - - String version="DFSCIOTest.0.0.1"; - String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; - - System.out.println(version); - if (args.length == 0) { - System.err.println(usage); - System.exit(-1); - } - for (int i = 0; i < args.length; i++) { // parse command line - if (args[i].startsWith("-r")) { - testType = TEST_TYPE_READ; - } else if (args[i].startsWith("-w")) { - testType = TEST_TYPE_WRITE; - } else if (args[i].startsWith("-clean")) { - testType = TEST_TYPE_CLEANUP; - } else if (args[i].startsWith("-seq")) { - isSequential = true; - } else if (args[i].equals("-nrFiles")) { - nrFiles = Integer.parseInt(args[++i]); - } else if (args[i].equals("-fileSize")) { - fileSize = Integer.parseInt(args[++i]); - } else if (args[i].equals("-bufferSize")) { - bufferSize = Integer.parseInt(args[++i]); - } else if (args[i].equals("-resFile")) { - resFileName = args[++i]; - } - } - - LOG.info("nrFiles = " + nrFiles); - LOG.info("fileSize (MB) = " + fileSize); - LOG.info("bufferSize = " + bufferSize); - - try { - fsConfig.setInt("test.io.file.buffer.size", bufferSize); - FileSystem fs = FileSystem.get(fsConfig); - - if (testType != TEST_TYPE_CLEANUP) { - fs.delete(HDFS_TEST_DIR, true); - if (!fs.mkdirs(HDFS_TEST_DIR)) { - throw new IOException("Mkdirs failed to create " + - HDFS_TEST_DIR.toString()); - } - - //Copy the executables over to the remote filesystem - String hadoopHome = System.getenv("HADOOP_HOME"); - fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION), - HDFS_SHLIB); - fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ); - fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE); - } - - if (isSequential) { - long tStart = System.currentTimeMillis(); - sequentialTest(fs, testType, fileSize, nrFiles); - long execTime = System.currentTimeMillis() - tStart; - String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; - LOG.info(resultLine); - return; - } - if (testType == TEST_TYPE_CLEANUP) { - cleanup(fs); - return; - } - createControlFile(fs, fileSize, nrFiles); - long tStart = System.currentTimeMillis(); - if (testType == TEST_TYPE_WRITE) - writeTest(fs); - if (testType == TEST_TYPE_READ) - readTest(fs); - long execTime = System.currentTimeMillis() - tStart; - - analyzeResult(fs, testType, execTime, resFileName); - } catch(Exception e) { - System.err.print(e.getLocalizedMessage()); - System.exit(-1); - } - } - - private static void analyzeResult( FileSystem fs, - int testType, - long execTime, - String resFileName - ) throws IOException { - Path reduceFile; - if (testType == TEST_TYPE_WRITE) - reduceFile = new Path(WRITE_DIR, "part-00000"); - else - reduceFile = new Path(READ_DIR, "part-00000"); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); - long tasks = 0; - long size = 0; - long time = 0; - float rate = 0; - float sqrate = 0; - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); - String attr = tokens.nextToken(); - if (attr.endsWith(":tasks")) - tasks = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":size")) - size = Long.parseLong(tokens. nextToken()); - else if (attr.endsWith(":time")) - time = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":rate")) - rate = Float.parseFloat(tokens.nextToken()); - else if (attr.endsWith(":sqrate")) - sqrate = Float.parseFloat(tokens.nextToken()); - } - - double med = rate / 1000 / tasks; - double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); - String resultLines[] = { - "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : - (testType == TEST_TYPE_READ) ? "read" : - "unknown"), - " Date & time: " + new Date(System.currentTimeMillis()), - " Number of files: " + tasks, - "Total MBytes processed: " + size/MEGA, - " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), - "Average IO rate mb/sec: " + med, - " Std IO rate deviation: " + stdDev, - " Test exec time sec: " + (float)execTime / 1000, - "" }; - - PrintStream res = new PrintStream( - new FileOutputStream( - new File(resFileName), true)); - for(int i = 0; i < resultLines.length; i++) { - LOG.info(resultLines[i]); - res.println(resultLines[i]); - } - } - - private static void cleanup(FileSystem fs) throws Exception { - LOG.info("Cleaning up test files"); - fs.delete(new Path(TEST_ROOT_DIR), true); - fs.delete(HDFS_TEST_DIR, true); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java deleted file mode 100644 index 21d9cdfb93..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java +++ /dev/null @@ -1,353 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintStream; -import java.util.Date; -import java.util.StringTokenizer; -import java.util.TreeSet; -import java.util.Vector; - -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; - -/** - * Distributed checkup of the file system consistency. - *

- * Test file system consistency by reading each block of each file - * of the specified file tree. - * Report corrupted blocks and general file statistics. - *

- * Optionally displays statistics on read performance. - * - */ -public class DistributedFSCheck extends TestCase { - // Constants - private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class); - private static final int TEST_TYPE_READ = 0; - private static final int TEST_TYPE_CLEANUP = 2; - private static final int DEFAULT_BUFFER_SIZE = 1000000; - private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log"; - private static final long MEGA = 0x100000; - - private static Configuration fsConfig = new Configuration(); - private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck")); - private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input"); - private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); - - private FileSystem fs; - private long nrFiles; - - DistributedFSCheck(Configuration conf) throws Exception { - fsConfig = conf; - this.fs = FileSystem.get(conf); - } - - /** - * Run distributed checkup for the entire files system. - * - * @throws Exception - */ - public void testFSBlocks() throws Exception { - testFSBlocks("/"); - } - - /** - * Run distributed checkup for the specified directory. - * - * @param rootName root directory name - * @throws Exception - */ - public void testFSBlocks(String rootName) throws Exception { - createInputFile(rootName); - runDistributedFSCheck(); - cleanup(); // clean up after all to restore the system state - } - - private void createInputFile(String rootName) throws IOException { - cleanup(); // clean up if previous run failed - - Path inputFile = new Path(MAP_INPUT_DIR, "in_file"); - SequenceFile.Writer writer = - SequenceFile.createWriter(fs, fsConfig, inputFile, - Text.class, LongWritable.class, CompressionType.NONE); - - try { - nrFiles = 0; - listSubtree(new Path(rootName), writer); - } finally { - writer.close(); - } - LOG.info("Created map input files."); - } - - private void listSubtree(Path rootFile, - SequenceFile.Writer writer - ) throws IOException { - FileStatus rootStatus = fs.getFileStatus(rootFile); - listSubtree(rootStatus, writer); - } - - private void listSubtree(FileStatus rootStatus, - SequenceFile.Writer writer - ) throws IOException { - Path rootFile = rootStatus.getPath(); - if (!rootStatus.isDir()) { - nrFiles++; - // For a regular file generate pairs - long blockSize = fs.getDefaultBlockSize(); - long fileLength = rootStatus.getLen(); - for(long offset = 0; offset < fileLength; offset += blockSize) - writer.append(new Text(rootFile.toString()), new LongWritable(offset)); - return; - } - - FileStatus children[] = fs.listStatus(rootFile); - if (children == null) - throw new IOException("Could not get listing for " + rootFile); - for (int i = 0; i < children.length; i++) - listSubtree(children[i], writer); - } - - /** - * DistributedFSCheck mapper class. - */ - public static class DistributedFSCheckMapper extends IOMapperBase { - - public DistributedFSCheckMapper() { - super(fsConfig); - } - - public Object doIO(Reporter reporter, - String name, - long offset - ) throws IOException { - // open file - FSDataInputStream in = null; - try { - in = fs.open(new Path(name)); - } catch(IOException e) { - return name + "@(missing)"; - } - in.seek(offset); - long actualSize = 0; - try { - long blockSize = fs.getDefaultBlockSize(); - reporter.setStatus("reading " + name + "@" + - offset + "/" + blockSize); - for( int curSize = bufferSize; - curSize == bufferSize && actualSize < blockSize; - actualSize += curSize) { - curSize = in.read(buffer, 0, bufferSize); - } - } catch(IOException e) { - LOG.info("Corrupted block detected in \"" + name + "\" at " + offset); - return name + "@" + offset; - } finally { - in.close(); - } - return new Long(actualSize); - } - - void collectStats(OutputCollector output, - String name, - long execTime, - Object corruptedBlock) throws IOException { - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"), - new Text(String.valueOf(1))); - - if (corruptedBlock.getClass().getName().endsWith("String")) { - output.collect( - new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"), - new Text((String)corruptedBlock)); - return; - } - long totalSize = ((Long)corruptedBlock).longValue(); - float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000); - LOG.info("Number of bytes processed = " + totalSize); - LOG.info("Exec time = " + execTime); - LOG.info("IO rate = " + ioRateMbSec); - - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), - new Text(String.valueOf(totalSize))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), - new Text(String.valueOf(execTime))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), - new Text(String.valueOf(ioRateMbSec*1000))); - } - } - - private void runDistributedFSCheck() throws Exception { - JobConf job = new JobConf(fs.getConf(), DistributedFSCheck.class); - - FileInputFormat.setInputPaths(job, MAP_INPUT_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(DistributedFSCheckMapper.class); - job.setReducerClass(AccumulatingReducer.class); - - FileOutputFormat.setOutputPath(job, READ_DIR); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - public static void main(String[] args) throws Exception { - int testType = TEST_TYPE_READ; - int bufferSize = DEFAULT_BUFFER_SIZE; - String resFileName = DEFAULT_RES_FILE_NAME; - String rootName = "/"; - boolean viewStats = false; - - String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] "; - - if (args.length == 1 && args[0].startsWith("-h")) { - System.err.println(usage); - System.exit(-1); - } - for(int i = 0; i < args.length; i++) { // parse command line - if (args[i].equals("-root")) { - rootName = args[++i]; - } else if (args[i].startsWith("-clean")) { - testType = TEST_TYPE_CLEANUP; - } else if (args[i].equals("-bufferSize")) { - bufferSize = Integer.parseInt(args[++i]); - } else if (args[i].equals("-resFile")) { - resFileName = args[++i]; - } else if (args[i].startsWith("-stat")) { - viewStats = true; - } - } - - LOG.info("root = " + rootName); - LOG.info("bufferSize = " + bufferSize); - - Configuration conf = new Configuration(); - conf.setInt("test.io.file.buffer.size", bufferSize); - DistributedFSCheck test = new DistributedFSCheck(conf); - - if (testType == TEST_TYPE_CLEANUP) { - test.cleanup(); - return; - } - test.createInputFile(rootName); - long tStart = System.currentTimeMillis(); - test.runDistributedFSCheck(); - long execTime = System.currentTimeMillis() - tStart; - - test.analyzeResult(execTime, resFileName, viewStats); - // test.cleanup(); // clean up after all to restore the system state - } - - private void analyzeResult(long execTime, - String resFileName, - boolean viewStats - ) throws IOException { - Path reduceFile= new Path(READ_DIR, "part-00000"); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); - long blocks = 0; - long size = 0; - long time = 0; - float rate = 0; - StringTokenizer badBlocks = null; - long nrBadBlocks = 0; - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); - String attr = tokens.nextToken(); - if (attr.endsWith("blocks")) - blocks = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith("size")) - size = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith("time")) - time = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith("rate")) - rate = Float.parseFloat(tokens.nextToken()); - else if (attr.endsWith("badBlocks")) { - badBlocks = new StringTokenizer(tokens.nextToken(), ";"); - nrBadBlocks = badBlocks.countTokens(); - } - } - - Vector resultLines = new Vector(); - resultLines.add( "----- DistributedFSCheck ----- : "); - resultLines.add( " Date & time: " + new Date(System.currentTimeMillis())); - resultLines.add( " Total number of blocks: " + blocks); - resultLines.add( " Total number of files: " + nrFiles); - resultLines.add( "Number of corrupted blocks: " + nrBadBlocks); - - int nrBadFilesPos = resultLines.size(); - TreeSet badFiles = new TreeSet(); - long nrBadFiles = 0; - if (nrBadBlocks > 0) { - resultLines.add(""); - resultLines.add("----- Corrupted Blocks (file@offset) ----- : "); - while(badBlocks.hasMoreTokens()) { - String curBlock = badBlocks.nextToken(); - resultLines.add(curBlock); - badFiles.add(curBlock.substring(0, curBlock.indexOf('@'))); - } - nrBadFiles = badFiles.size(); - } - - resultLines.insertElementAt(" Number of corrupted files: " + nrBadFiles, nrBadFilesPos); - - if (viewStats) { - resultLines.add(""); - resultLines.add("----- Performance ----- : "); - resultLines.add(" Total MBytes read: " + size/MEGA); - resultLines.add(" Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA)); - resultLines.add(" Average IO rate mb/sec: " + rate / 1000 / blocks); - resultLines.add(" Test exec time sec: " + (float)execTime / 1000); - } - - PrintStream res = new PrintStream( - new FileOutputStream( - new File(resFileName), true)); - for(int i = 0; i < resultLines.size(); i++) { - String cur = resultLines.get(i); - LOG.info(cur); - res.println(cur); - } - } - - private void cleanup() throws IOException { - LOG.info("Cleaning up test files"); - fs.delete(TEST_ROOT_DIR, true); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java deleted file mode 100644 index 672bf89594..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.IOException; -import java.net.InetAddress; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; - -/** - * Base mapper class for IO operations. - *

- * Two abstract method {@link #doIO(Reporter, String, long)} and - * {@link #collectStats(OutputCollector,String,long,Object)} should be - * overloaded in derived classes to define the IO operation and the - * statistics data to be collected by subsequent reducers. - * - */ -public abstract class IOMapperBase extends Configured - implements Mapper { - - protected byte[] buffer; - protected int bufferSize; - protected FileSystem fs; - protected String hostName; - - public IOMapperBase(Configuration conf) { - super(conf); - try { - fs = FileSystem.get(conf); - } catch (Exception e) { - throw new RuntimeException("Cannot create file system.", e); - } - bufferSize = conf.getInt("test.io.file.buffer.size", 4096); - buffer = new byte[bufferSize]; - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch(Exception e) { - hostName = "localhost"; - } - } - - public void configure(JobConf job) { - setConf(job); - } - - public void close() throws IOException { - } - - /** - * Perform io operation, usually read or write. - * - * @param reporter - * @param name file name - * @param value offset within the file - * @return object that is passed as a parameter to - * {@link #collectStats(OutputCollector,String,long,Object)} - * @throws IOException - */ - abstract Object doIO(Reporter reporter, - String name, - long value) throws IOException; - - /** - * Collect stat data to be combined by a subsequent reducer. - * - * @param output - * @param name file name - * @param execTime IO execution time - * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)} - * @throws IOException - */ - abstract void collectStats(OutputCollector output, - String name, - long execTime, - Object doIOReturnValue) throws IOException; - - /** - * Map file name and offset into statistical data. - *

- * The map task is to get the - * key, which contains the file name, and the - * value, which is the offset within the file. - * - * The parameters are passed to the abstract method - * {@link #doIO(Reporter,String,long)}, which performs the io operation, - * usually read or write data, and then - * {@link #collectStats(OutputCollector,String,long,Object)} - * is called to prepare stat data for a subsequent reducer. - */ - public void map(Text key, - LongWritable value, - OutputCollector output, - Reporter reporter) throws IOException { - String name = key.toString(); - long longValue = value.get(); - - reporter.setStatus("starting " + name + " ::host = " + hostName); - - long tStart = System.currentTimeMillis(); - Object statValue = doIO(reporter, name, longValue); - long tEnd = System.currentTimeMillis(); - long execTime = tEnd - tStart; - collectStats(output, name, execTime, statValue); - - reporter.setStatus("finished " + name + " ::host = " + hostName); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java deleted file mode 100644 index a64d24dcaf..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestCopyFiles.java +++ /dev/null @@ -1,853 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.ByteArrayOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.StringTokenizer; - -import junit.framework.TestCase; - -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.mapred.MiniMRCluster; -import org.apache.hadoop.security.UnixUserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Level; - - -/** - * A JUnit test for copying files recursively. - */ -public class TestCopyFiles extends TestCase { - { - ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange") - ).getLogger().setLevel(Level.OFF); - ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF); - ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF); - ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL); - } - - static final URI LOCAL_FS = URI.create("file:///"); - - private static final Random RAN = new Random(); - private static final int NFILES = 20; - private static String TEST_ROOT_DIR = - new Path(System.getProperty("test.build.data","/tmp")) - .toString().replace(' ', '+'); - - /** class MyFile contains enough information to recreate the contents of - * a single file. - */ - private static class MyFile { - private static Random gen = new Random(); - private static final int MAX_LEVELS = 3; - private static final int MAX_SIZE = 8*1024; - private static String[] dirNames = { - "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine" - }; - private final String name; - private int size = 0; - private long seed = 0L; - - MyFile() { - this(gen.nextInt(MAX_LEVELS)); - } - MyFile(int nLevels) { - String xname = ""; - if (nLevels != 0) { - int[] levels = new int[nLevels]; - for (int idx = 0; idx < nLevels; idx++) { - levels[idx] = gen.nextInt(10); - } - StringBuffer sb = new StringBuffer(); - for (int idx = 0; idx < nLevels; idx++) { - sb.append(dirNames[levels[idx]]); - sb.append("/"); - } - xname = sb.toString(); - } - long fidx = gen.nextLong() & Long.MAX_VALUE; - name = xname + Long.toString(fidx); - reset(); - } - void reset() { - final int oldsize = size; - do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size); - final long oldseed = seed; - do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed); - } - String getName() { return name; } - int getSize() { return size; } - long getSeed() { return seed; } - } - - private static MyFile[] createFiles(URI fsname, String topdir) - throws IOException { - return createFiles(FileSystem.get(fsname, new Configuration()), topdir); - } - - /** create NFILES with random names and directory hierarchies - * with random (but reproducible) data in them. - */ - private static MyFile[] createFiles(FileSystem fs, String topdir) - throws IOException { - Path root = new Path(topdir); - MyFile[] files = new MyFile[NFILES]; - for (int i = 0; i < NFILES; i++) { - files[i] = createFile(root, fs); - } - return files; - } - - static MyFile createFile(Path root, FileSystem fs, int levels) - throws IOException { - MyFile f = levels < 0 ? new MyFile() : new MyFile(levels); - Path p = new Path(root, f.getName()); - FSDataOutputStream out = fs.create(p); - byte[] toWrite = new byte[f.getSize()]; - new Random(f.getSeed()).nextBytes(toWrite); - out.write(toWrite); - out.close(); - FileSystem.LOG.info("created: " + p + ", size=" + f.getSize()); - return f; - } - - static MyFile createFile(Path root, FileSystem fs) throws IOException { - return createFile(root, fs, -1); - } - - private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files - ) throws IOException { - return checkFiles(fs, topdir, files, false); - } - - private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files, - boolean existingOnly) throws IOException { - Path root = new Path(topdir); - - for (int idx = 0; idx < files.length; idx++) { - Path fPath = new Path(root, files[idx].getName()); - try { - fs.getFileStatus(fPath); - FSDataInputStream in = fs.open(fPath); - byte[] toRead = new byte[files[idx].getSize()]; - byte[] toCompare = new byte[files[idx].getSize()]; - Random rb = new Random(files[idx].getSeed()); - rb.nextBytes(toCompare); - assertEquals("Cannnot read file.", toRead.length, in.read(toRead)); - in.close(); - for (int i = 0; i < toRead.length; i++) { - if (toRead[i] != toCompare[i]) { - return false; - } - } - toRead = null; - toCompare = null; - } - catch(FileNotFoundException fnfe) { - if (!existingOnly) { - throw fnfe; - } - } - } - - return true; - } - - private static void updateFiles(FileSystem fs, String topdir, MyFile[] files, - int nupdate) throws IOException { - assert nupdate <= NFILES; - - Path root = new Path(topdir); - - for (int idx = 0; idx < nupdate; ++idx) { - Path fPath = new Path(root, files[idx].getName()); - // overwrite file - assertTrue(fPath.toString() + " does not exist", fs.exists(fPath)); - FSDataOutputStream out = fs.create(fPath); - files[idx].reset(); - byte[] toWrite = new byte[files[idx].getSize()]; - Random rb = new Random(files[idx].getSeed()); - rb.nextBytes(toWrite); - out.write(toWrite); - out.close(); - } - } - - private static FileStatus[] getFileStatus(FileSystem fs, - String topdir, MyFile[] files) throws IOException { - return getFileStatus(fs, topdir, files, false); - } - private static FileStatus[] getFileStatus(FileSystem fs, - String topdir, MyFile[] files, boolean existingOnly) throws IOException { - Path root = new Path(topdir); - List statuses = new ArrayList(); - for (int idx = 0; idx < NFILES; ++idx) { - try { - statuses.add(fs.getFileStatus(new Path(root, files[idx].getName()))); - } catch(FileNotFoundException fnfe) { - if (!existingOnly) { - throw fnfe; - } - } - } - return statuses.toArray(new FileStatus[statuses.size()]); - } - - private static boolean checkUpdate(FileSystem fs, FileStatus[] old, - String topdir, MyFile[] upd, final int nupdate) throws IOException { - Path root = new Path(topdir); - - // overwrote updated files - for (int idx = 0; idx < nupdate; ++idx) { - final FileStatus stat = - fs.getFileStatus(new Path(root, upd[idx].getName())); - if (stat.getModificationTime() <= old[idx].getModificationTime()) { - return false; - } - } - // did not overwrite files not updated - for (int idx = nupdate; idx < NFILES; ++idx) { - final FileStatus stat = - fs.getFileStatus(new Path(root, upd[idx].getName())); - if (stat.getModificationTime() != old[idx].getModificationTime()) { - return false; - } - } - return true; - } - - /** delete directory and everything underneath it.*/ - private static void deldir(FileSystem fs, String topdir) throws IOException { - fs.delete(new Path(topdir), true); - } - - /** copy files from local file system to local file system */ - public void testCopyFromLocalToLocal() throws Exception { - Configuration conf = new Configuration(); - FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(localfs, TEST_ROOT_DIR+"/srcdat"); - } - - /** copy files from dfs file system to dfs file system */ - public void testCopyFromDfsToDfs() throws Exception { - String namenode = null; - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster(conf, 2, true, null); - final FileSystem hdfs = cluster.getFileSystem(); - namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCp(conf), new String[] { - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); - assertTrue("Log directory does not exist.", - fs.exists(new Path(namenode+"/logs"))); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** copy files from local file system to dfs file system */ - public void testCopyFromLocalToDfs() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster(conf, 1, true, null); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = hdfs.getUri().toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCp(conf), new String[] { - "-log", - namenode+"/logs", - "file:///"+TEST_ROOT_DIR+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(cluster.getFileSystem(), "/destdat", files)); - assertTrue("Log directory does not exist.", - hdfs.exists(new Path(namenode+"/logs"))); - deldir(hdfs, "/destdat"); - deldir(hdfs, "/logs"); - deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** copy files from dfs file system to local file system */ - public void testCopyFromDfsToLocal() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); - cluster = new MiniDFSCluster(conf, 1, true, null); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = FileSystem.getDefaultUri(conf).toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCp(conf), new String[] { - "-log", - "/logs", - namenode+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files)); - assertTrue("Log directory does not exist.", - hdfs.exists(new Path("/logs"))); - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(hdfs, "/logs"); - deldir(hdfs, "/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - public void testCopyDfsToDfsUpdateOverwrite() throws Exception { - MiniDFSCluster cluster = null; - try { - Configuration conf = new Configuration(); - cluster = new MiniDFSCluster(conf, 2, true, null); - final FileSystem hdfs = cluster.getFileSystem(); - final String namenode = hdfs.getUri().toString(); - if (namenode.startsWith("hdfs://")) { - MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); - ToolRunner.run(new DistCp(conf), new String[] { - "-p", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf); - assertTrue("Log directory does not exist.", - fs.exists(new Path(namenode+"/logs"))); - - FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files); - final int nupdate = NFILES>>2; - updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate); - deldir(hdfs, "/logs"); - - ToolRunner.run(new DistCp(conf), new String[] { - "-p", - "-update", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - assertTrue("Update failed to replicate all changes in src", - checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate)); - - deldir(hdfs, "/logs"); - ToolRunner.run(new DistCp(conf), new String[] { - "-p", - "-overwrite", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(hdfs, "/destdat", files)); - assertTrue("-overwrite didn't.", - checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES)); - - deldir(hdfs, "/destdat"); - deldir(hdfs, "/srcdat"); - deldir(hdfs, "/logs"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - public void testCopyDuplication() throws Exception { - final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration()); - try { - MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat"); - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files)); - - assertEquals(DistCp.DuplicationException.ERROR_CODE, - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/src2/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat",})); - } - finally { - deldir(localfs, TEST_ROOT_DIR+"/destdat"); - deldir(localfs, TEST_ROOT_DIR+"/srcdat"); - deldir(localfs, TEST_ROOT_DIR+"/src2"); - } - } - - public void testCopySingleFile() throws Exception { - FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration()); - Path root = new Path(TEST_ROOT_DIR+"/srcdat"); - try { - MyFile[] files = {createFile(root, fs)}; - //copy a dir with a single file - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat", - "file:///"+TEST_ROOT_DIR+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/destdat", files)); - - //copy a single file - String fname = files[0].getName(); - Path p = new Path(root, fname); - FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p)); - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname, - "file:///"+TEST_ROOT_DIR+"/dest2/"+fname}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files)); - //copy single file to existing dir - deldir(fs, TEST_ROOT_DIR+"/dest2"); - fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2")); - MyFile[] files2 = {createFile(root, fs, 0)}; - String sname = files2[0].getName(); - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); - updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1); - //copy single file to existing dir w/ dst name conflict - ToolRunner.run(new DistCp(new Configuration()), - new String[] {"-update", - "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname, - "file:///"+TEST_ROOT_DIR+"/dest2/"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2)); - } - finally { - deldir(fs, TEST_ROOT_DIR+"/destdat"); - deldir(fs, TEST_ROOT_DIR+"/dest2"); - deldir(fs, TEST_ROOT_DIR+"/srcdat"); - } - } - - public void testPreserveOption() throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(conf, 2, true, null); - String nnUri = FileSystem.getDefaultUri(conf).toString(); - FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - - {//test preserving user - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - for(int i = 0; i < srcstat.length; i++) { - fs.setOwner(srcstat[i].getPath(), "u" + i, null); - } - ToolRunner.run(new DistCp(conf), - new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, "u" + i, dststat[i].getOwner()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - - {//test preserving group - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - for(int i = 0; i < srcstat.length; i++) { - fs.setOwner(srcstat[i].getPath(), null, "g" + i); - } - ToolRunner.run(new DistCp(conf), - new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, "g" + i, dststat[i].getGroup()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - - {//test preserving mode - MyFile[] files = createFiles(URI.create(nnUri), "/srcdat"); - FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files); - FsPermission[] permissions = new FsPermission[srcstat.length]; - for(int i = 0; i < srcstat.length; i++) { - permissions[i] = new FsPermission((short)(i & 0666)); - fs.setPermission(srcstat[i].getPath(), permissions[i]); - } - - ToolRunner.run(new DistCp(conf), - new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - FileStatus[] dststat = getFileStatus(fs, "/destdat", files); - for(int i = 0; i < dststat.length; i++) { - assertEquals("i=" + i, permissions[i], dststat[i].getPermission()); - } - deldir(fs, "/destdat"); - deldir(fs, "/srcdat"); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - public void testMapCount() throws Exception { - String namenode = null; - MiniDFSCluster dfs = null; - MiniMRCluster mr = null; - try { - Configuration conf = new Configuration(); - dfs = new MiniDFSCluster(conf, 3, true, null); - FileSystem fs = dfs.getFileSystem(); - final FsShell shell = new FsShell(conf); - namenode = fs.getUri().toString(); - mr = new MiniMRCluster(3, namenode, 1); - MyFile[] files = createFiles(fs.getUri(), "/srcdat"); - long totsize = 0; - for (MyFile f : files) { - totsize += f.getSize(); - } - Configuration job = mr.createJobConf(); - job.setLong("distcp.bytes.per.map", totsize / 3); - ToolRunner.run(new DistCp(job), - new String[] {"-m", "100", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - assertTrue("Source and destination directories do not match.", - checkFiles(fs, "/destdat", files)); - - String logdir = namenode + "/logs"; - System.out.println(execCmd(shell, "-lsr", logdir)); - FileStatus[] logs = fs.listStatus(new Path(logdir)); - // rare case where splits are exact, logs.length can be 4 - assertTrue("Unexpected map count, logs.length=" + logs.length, - logs.length == 5 || logs.length == 4); - - deldir(fs, "/destdat"); - deldir(fs, "/logs"); - ToolRunner.run(new DistCp(job), - new String[] {"-m", "1", - "-log", - namenode+"/logs", - namenode+"/srcdat", - namenode+"/destdat"}); - - System.out.println(execCmd(shell, "-lsr", logdir)); - logs = fs.listStatus(new Path(namenode+"/logs")); - assertTrue("Unexpected map count, logs.length=" + logs.length, - logs.length == 2); - } finally { - if (dfs != null) { dfs.shutdown(); } - if (mr != null) { mr.shutdown(); } - } - } - - public void testLimits() throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(conf, 2, true, null); - final String nnUri = FileSystem.getDefaultUri(conf).toString(); - final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - final DistCp distcp = new DistCp(conf); - final FsShell shell = new FsShell(conf); - - final String srcrootdir = "/src_root"; - final Path srcrootpath = new Path(srcrootdir); - final String dstrootdir = "/dst_root"; - final Path dstrootpath = new Path(dstrootdir); - - {//test -filelimit - MyFile[] files = createFiles(URI.create(nnUri), srcrootdir); - int filelimit = files.length / 2; - System.out.println("filelimit=" + filelimit); - - ToolRunner.run(distcp, - new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir}); - String results = execCmd(shell, "-lsr", dstrootdir); - results = removePrefix(results, dstrootdir); - System.out.println("results=" + results); - - FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true); - assertEquals(filelimit, dststat.length); - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - - {//test -sizelimit - createFiles(URI.create(nnUri), srcrootdir); - long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2; - System.out.println("sizelimit=" + sizelimit); - - ToolRunner.run(distcp, - new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir}); - - ContentSummary summary = fs.getContentSummary(dstrootpath); - System.out.println("summary=" + summary); - assertTrue(summary.getLength() <= sizelimit); - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - - {//test update - final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir); - final long totalsize = fs.getContentSummary(srcrootpath).getLength(); - System.out.println("src.length=" + srcs.length); - System.out.println("totalsize =" + totalsize); - fs.mkdirs(dstrootpath); - final int parts = RAN.nextInt(NFILES/3 - 1) + 2; - final int filelimit = srcs.length/parts; - final long sizelimit = totalsize/parts; - System.out.println("filelimit=" + filelimit); - System.out.println("sizelimit=" + sizelimit); - System.out.println("parts =" + parts); - final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit, - "-update", nnUri+srcrootdir, nnUri+dstrootdir}; - - int dstfilecount = 0; - long dstsize = 0; - for(int i = 0; i <= parts; i++) { - ToolRunner.run(distcp, args); - - FileStatus[] dststat = getFileStatus(fs, dstrootdir, srcs, true); - System.out.println(i + ") dststat.length=" + dststat.length); - assertTrue(dststat.length - dstfilecount <= filelimit); - ContentSummary summary = fs.getContentSummary(dstrootpath); - System.out.println(i + ") summary.getLength()=" + summary.getLength()); - assertTrue(summary.getLength() - dstsize <= sizelimit); - assertTrue(checkFiles(fs, dstrootdir, srcs, true)); - dstfilecount = dststat.length; - dstsize = summary.getLength(); - } - - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - static final long now = System.currentTimeMillis(); - - static UnixUserGroupInformation createUGI(String name, boolean issuper) { - String username = name + now; - String group = issuper? "supergroup": username; - return UnixUserGroupInformation.createImmutable( - new String[]{username, group}); - } - - static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi - ) throws IOException { - final Path home = new Path("/user/" + ugi.getUserName()); - fs.mkdirs(home); - fs.setOwner(home, ugi.getUserName(), ugi.getGroupNames()[0]); - fs.setPermission(home, new FsPermission((short)0700)); - return home; - } - - public void testHftpAccessControl() throws Exception { - MiniDFSCluster cluster = null; - try { - final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); - final UnixUserGroupInformation USER_UGI = createUGI("user", false); - - //start cluster by DFS_UGI - final Configuration dfsConf = new Configuration(); - UnixUserGroupInformation.saveToConf(dfsConf, - UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI); - cluster = new MiniDFSCluster(dfsConf, 2, true, null); - cluster.waitActive(); - - final String httpAdd = dfsConf.get("dfs.http.address"); - final URI nnURI = FileSystem.getDefaultUri(dfsConf); - final String nnUri = nnURI.toString(); - final Path home = createHomeDirectory(FileSystem.get(nnURI, dfsConf), USER_UGI); - - //now, login as USER_UGI - final Configuration userConf = new Configuration(); - UnixUserGroupInformation.saveToConf(userConf, - UnixUserGroupInformation.UGI_PROPERTY_NAME, USER_UGI); - final FileSystem fs = FileSystem.get(nnURI, userConf); - - final Path srcrootpath = new Path(home, "src_root"); - final String srcrootdir = srcrootpath.toString(); - final Path dstrootpath = new Path(home, "dst_root"); - final String dstrootdir = dstrootpath.toString(); - final DistCp distcp = new DistCp(userConf); - - FileSystem.mkdirs(fs, srcrootpath, new FsPermission((short)0700)); - final String[] args = {"hftp://"+httpAdd+srcrootdir, nnUri+dstrootdir}; - - { //copy with permission 000, should fail - fs.setPermission(srcrootpath, new FsPermission((short)0)); - assertEquals(-3, ToolRunner.run(distcp, args)); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - /** test -delete */ - public void testDelete() throws Exception { - final Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(conf, 2, true, null); - final URI nnURI = FileSystem.getDefaultUri(conf); - final String nnUri = nnURI.toString(); - final FileSystem fs = FileSystem.get(URI.create(nnUri), conf); - - final DistCp distcp = new DistCp(conf); - final FsShell shell = new FsShell(conf); - - final String srcrootdir = "/src_root"; - final String dstrootdir = "/dst_root"; - - { - //create source files - createFiles(nnURI, srcrootdir); - String srcresults = execCmd(shell, "-lsr", srcrootdir); - srcresults = removePrefix(srcresults, srcrootdir); - System.out.println("srcresults=" + srcresults); - - //create some files in dst - createFiles(nnURI, dstrootdir); - System.out.println("dstrootdir=" + dstrootdir); - shell.run(new String[]{"-lsr", dstrootdir}); - - //run distcp - ToolRunner.run(distcp, - new String[]{"-delete", "-update", "-log", "/log", - nnUri+srcrootdir, nnUri+dstrootdir}); - - //make sure src and dst contains the same files - String dstresults = execCmd(shell, "-lsr", dstrootdir); - dstresults = removePrefix(dstresults, dstrootdir); - System.out.println("first dstresults=" + dstresults); - assertEquals(srcresults, dstresults); - - //create additional file in dst - create(fs, new Path(dstrootdir, "foo")); - create(fs, new Path(dstrootdir, "foobar")); - - //run distcp again - ToolRunner.run(distcp, - new String[]{"-delete", "-update", "-log", "/log2", - nnUri+srcrootdir, nnUri+dstrootdir}); - - //make sure src and dst contains the same files - dstresults = execCmd(shell, "-lsr", dstrootdir); - dstresults = removePrefix(dstresults, dstrootdir); - System.out.println("second dstresults=" + dstresults); - assertEquals(srcresults, dstresults); - - //cleanup - deldir(fs, dstrootdir); - deldir(fs, srcrootdir); - } - } finally { - if (cluster != null) { cluster.shutdown(); } - } - } - - static void create(FileSystem fs, Path f) throws IOException { - FSDataOutputStream out = fs.create(f); - try { - byte[] b = new byte[1024 + RAN.nextInt(1024)]; - RAN.nextBytes(b); - out.write(b); - } finally { - if (out != null) out.close(); - } - } - - static String execCmd(FsShell shell, String... args) throws Exception { - ByteArrayOutputStream baout = new ByteArrayOutputStream(); - PrintStream out = new PrintStream(baout, true); - PrintStream old = System.out; - System.setOut(out); - shell.run(args); - out.close(); - System.setOut(old); - return baout.toString(); - } - - private static String removePrefix(String lines, String prefix) { - final int prefixlen = prefix.length(); - final StringTokenizer t = new StringTokenizer(lines, "\n"); - final StringBuffer results = new StringBuffer(); - for(; t.hasMoreTokens(); ) { - String s = t.nextToken(); - results.append(s.substring(s.indexOf(prefix) + prefixlen) + "\n"); - } - return results.toString(); - } -} \ No newline at end of file diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java deleted file mode 100644 index b21914eb01..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java +++ /dev/null @@ -1,445 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.PrintStream; -import java.util.Date; -import java.util.StringTokenizer; - -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.StringUtils; - -/** - * Distributed i/o benchmark. - *

- * This test writes into or reads from a specified number of files. - * File size is specified as a parameter to the test. - * Each file is accessed in a separate map task. - *

- * The reducer collects the following statistics: - *

    - *
  • number of tasks completed
  • - *
  • number of bytes written/read
  • - *
  • execution time
  • - *
  • io rate
  • - *
  • io rate squared
  • - *
- * - * Finally, the following information is appended to a local file - *
    - *
  • read or write test
  • - *
  • date and time the test finished
  • - *
  • number of files
  • - *
  • total number of bytes processed
  • - *
  • throughput in mb/sec (total number of bytes / sum of processing times)
  • - *
  • average i/o rate in mb/sec per file
  • - *
  • standard deviation of i/o rate
  • - *
- */ -public class TestDFSIO extends TestCase { - // Constants - private static final Log LOG = LogFactory.getLog(TestDFSIO.class); - private static final int TEST_TYPE_READ = 0; - private static final int TEST_TYPE_WRITE = 1; - private static final int TEST_TYPE_CLEANUP = 2; - private static final int DEFAULT_BUFFER_SIZE = 1000000; - private static final String BASE_FILE_NAME = "test_io_"; - private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; - - private static Configuration fsConfig = new Configuration(); - private static final long MEGA = 0x100000; - private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO"); - private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); - private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); - private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); - private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); - - /** - * Run the test with default parameters. - * - * @throws Exception - */ - public void testIOs() throws Exception { - testIOs(10, 10); - } - - /** - * Run the test with the specified parameters. - * - * @param fileSize file size - * @param nrFiles number of files - * @throws IOException - */ - public static void testIOs(int fileSize, int nrFiles) - throws IOException { - - FileSystem fs = FileSystem.get(fsConfig); - - createControlFile(fs, fileSize, nrFiles); - writeTest(fs); - readTest(fs); - cleanup(fs); - } - - private static void createControlFile( - FileSystem fs, - int fileSize, // in MB - int nrFiles - ) throws IOException { - LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); - - fs.delete(CONTROL_DIR, true); - - for(int i=0; i < nrFiles; i++) { - String name = getFileName(i); - Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter(fs, fsConfig, controlFile, - Text.class, LongWritable.class, - CompressionType.NONE); - writer.append(new Text(name), new LongWritable(fileSize)); - } catch(Exception e) { - throw new IOException(e.getLocalizedMessage()); - } finally { - if (writer != null) - writer.close(); - writer = null; - } - } - LOG.info("created control files for: "+nrFiles+" files"); - } - - private static String getFileName(int fIdx) { - return BASE_FILE_NAME + Integer.toString(fIdx); - } - - /** - * Write/Read mapper base class. - *

- * Collects the following statistics per task: - *

    - *
  • number of tasks completed
  • - *
  • number of bytes written/read
  • - *
  • execution time
  • - *
  • i/o rate
  • - *
  • i/o rate squared
  • - *
- */ - private abstract static class IOStatMapper extends IOMapperBase { - IOStatMapper() { - super(fsConfig); - } - - void collectStats(OutputCollector output, - String name, - long execTime, - Object objSize) throws IOException { - long totalSize = ((Long)objSize).longValue(); - float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); - LOG.info("Number of bytes processed = " + totalSize); - LOG.info("Exec time = " + execTime); - LOG.info("IO rate = " + ioRateMbSec); - - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"), - new Text(String.valueOf(1))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"), - new Text(String.valueOf(totalSize))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"), - new Text(String.valueOf(execTime))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"), - new Text(String.valueOf(ioRateMbSec*1000))); - output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"), - new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); - } - } - - /** - * Write mapper class. - */ - public static class WriteMapper extends IOStatMapper { - - public WriteMapper() { - super(); - for(int i=0; i < bufferSize; i++) - buffer[i] = (byte)('0' + i % 50); - } - - public Object doIO(Reporter reporter, - String name, - long totalSize - ) throws IOException { - // create file - totalSize *= MEGA; - OutputStream out; - out = fs.create(new Path(DATA_DIR, name), true, bufferSize); - - try { - // write to the file - long nrRemaining; - for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) { - int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; - out.write(buffer, 0, curSize); - reporter.setStatus("writing " + name + "@" + - (totalSize - nrRemaining) + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - out.close(); - } - return new Long(totalSize); - } - } - - private static void writeTest(FileSystem fs) - throws IOException { - - fs.delete(DATA_DIR, true); - fs.delete(WRITE_DIR, true); - - runIOTest(WriteMapper.class, WRITE_DIR); - } - - private static void runIOTest( Class mapperClass, - Path outputDir - ) throws IOException { - JobConf job = new JobConf(fsConfig, TestDFSIO.class); - - FileInputFormat.setInputPaths(job, CONTROL_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(mapperClass); - job.setReducerClass(AccumulatingReducer.class); - - FileOutputFormat.setOutputPath(job, outputDir); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - /** - * Read mapper class. - */ - public static class ReadMapper extends IOStatMapper { - - public ReadMapper() { - super(); - } - - public Object doIO(Reporter reporter, - String name, - long totalSize - ) throws IOException { - totalSize *= MEGA; - // open file - DataInputStream in = fs.open(new Path(DATA_DIR, name)); - try { - long actualSize = 0; - for(int curSize = bufferSize; curSize == bufferSize;) { - curSize = in.read(buffer, 0, bufferSize); - actualSize += curSize; - reporter.setStatus("reading " + name + "@" + - actualSize + "/" + totalSize - + " ::host = " + hostName); - } - } finally { - in.close(); - } - return new Long(totalSize); - } - } - - private static void readTest(FileSystem fs) throws IOException { - fs.delete(READ_DIR, true); - runIOTest(ReadMapper.class, READ_DIR); - } - - private static void sequentialTest( - FileSystem fs, - int testType, - int fileSize, - int nrFiles - ) throws Exception { - IOStatMapper ioer = null; - if (testType == TEST_TYPE_READ) - ioer = new ReadMapper(); - else if (testType == TEST_TYPE_WRITE) - ioer = new WriteMapper(); - else - return; - for(int i=0; i < nrFiles; i++) - ioer.doIO(Reporter.NULL, - BASE_FILE_NAME+Integer.toString(i), - MEGA*fileSize); - } - - public static void main(String[] args) { - int testType = TEST_TYPE_READ; - int bufferSize = DEFAULT_BUFFER_SIZE; - int fileSize = 1; - int nrFiles = 1; - String resFileName = DEFAULT_RES_FILE_NAME; - boolean isSequential = false; - - String className = TestDFSIO.class.getSimpleName(); - String version = className + ".0.0.4"; - String usage = "Usage: " + className + " -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] "; - - System.out.println(version); - if (args.length == 0) { - System.err.println(usage); - System.exit(-1); - } - for (int i = 0; i < args.length; i++) { // parse command line - if (args[i].startsWith("-read")) { - testType = TEST_TYPE_READ; - } else if (args[i].equals("-write")) { - testType = TEST_TYPE_WRITE; - } else if (args[i].equals("-clean")) { - testType = TEST_TYPE_CLEANUP; - } else if (args[i].startsWith("-seq")) { - isSequential = true; - } else if (args[i].equals("-nrFiles")) { - nrFiles = Integer.parseInt(args[++i]); - } else if (args[i].equals("-fileSize")) { - fileSize = Integer.parseInt(args[++i]); - } else if (args[i].equals("-bufferSize")) { - bufferSize = Integer.parseInt(args[++i]); - } else if (args[i].equals("-resFile")) { - resFileName = args[++i]; - } - } - - LOG.info("nrFiles = " + nrFiles); - LOG.info("fileSize (MB) = " + fileSize); - LOG.info("bufferSize = " + bufferSize); - - try { - fsConfig.setInt("test.io.file.buffer.size", bufferSize); - FileSystem fs = FileSystem.get(fsConfig); - - if (isSequential) { - long tStart = System.currentTimeMillis(); - sequentialTest(fs, testType, fileSize, nrFiles); - long execTime = System.currentTimeMillis() - tStart; - String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000; - LOG.info(resultLine); - return; - } - if (testType == TEST_TYPE_CLEANUP) { - cleanup(fs); - return; - } - createControlFile(fs, fileSize, nrFiles); - long tStart = System.currentTimeMillis(); - if (testType == TEST_TYPE_WRITE) - writeTest(fs); - if (testType == TEST_TYPE_READ) - readTest(fs); - long execTime = System.currentTimeMillis() - tStart; - - analyzeResult(fs, testType, execTime, resFileName); - } catch(Exception e) { - System.err.print(StringUtils.stringifyException(e)); - System.exit(-1); - } - } - - private static void analyzeResult( FileSystem fs, - int testType, - long execTime, - String resFileName - ) throws IOException { - Path reduceFile; - if (testType == TEST_TYPE_WRITE) - reduceFile = new Path(WRITE_DIR, "part-00000"); - else - reduceFile = new Path(READ_DIR, "part-00000"); - DataInputStream in; - in = new DataInputStream(fs.open(reduceFile)); - - BufferedReader lines; - lines = new BufferedReader(new InputStreamReader(in)); - long tasks = 0; - long size = 0; - long time = 0; - float rate = 0; - float sqrate = 0; - String line; - while((line = lines.readLine()) != null) { - StringTokenizer tokens = new StringTokenizer(line, " \t\n\r\f%"); - String attr = tokens.nextToken(); - if (attr.endsWith(":tasks")) - tasks = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":size")) - size = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":time")) - time = Long.parseLong(tokens.nextToken()); - else if (attr.endsWith(":rate")) - rate = Float.parseFloat(tokens.nextToken()); - else if (attr.endsWith(":sqrate")) - sqrate = Float.parseFloat(tokens.nextToken()); - } - - double med = rate / 1000 / tasks; - double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); - String resultLines[] = { - "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : - (testType == TEST_TYPE_READ) ? "read" : - "unknown"), - " Date & time: " + new Date(System.currentTimeMillis()), - " Number of files: " + tasks, - "Total MBytes processed: " + size/MEGA, - " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), - "Average IO rate mb/sec: " + med, - " IO rate std deviation: " + stdDev, - " Test exec time sec: " + (float)execTime / 1000, - "" }; - - PrintStream res = new PrintStream( - new FileOutputStream( - new File(resFileName), true)); - for(int i = 0; i < resultLines.length; i++) { - LOG.info(resultLines[i]); - res.println(resultLines[i]); - } - } - - private static void cleanup(FileSystem fs) throws IOException { - LOG.info("Cleaning up test files"); - fs.delete(new Path(TEST_ROOT_DIR), true); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java deleted file mode 100644 index c83993d0ca..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java +++ /dev/null @@ -1,629 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Random; -import java.util.List; -import java.util.ArrayList; -import java.util.Set; -import java.util.HashSet; -import java.util.Map; -import java.util.HashMap; -import java.net.InetSocketAddress; -import java.net.URI; - -import junit.framework.TestCase; - -import org.apache.commons.logging.Log; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.fs.shell.CommandFormat; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.mapred.lib.LongSumReducer; -import org.apache.hadoop.security.UnixUserGroupInformation; - -public class TestFileSystem extends TestCase { - private static final Log LOG = FileSystem.LOG; - - private static Configuration conf = new Configuration(); - private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096); - - private static final long MEGA = 1024 * 1024; - private static final int SEEKS_PER_FILE = 4; - - private static String ROOT = System.getProperty("test.build.data","fs_test"); - private static Path CONTROL_DIR = new Path(ROOT, "fs_control"); - private static Path WRITE_DIR = new Path(ROOT, "fs_write"); - private static Path READ_DIR = new Path(ROOT, "fs_read"); - private static Path DATA_DIR = new Path(ROOT, "fs_data"); - - public void testFs() throws Exception { - testFs(10 * MEGA, 100, 0); - } - - public static void testFs(long megaBytes, int numFiles, long seed) - throws Exception { - - FileSystem fs = FileSystem.get(conf); - - if (seed == 0) - seed = new Random().nextLong(); - - LOG.info("seed = "+seed); - - createControlFile(fs, megaBytes, numFiles, seed); - writeTest(fs, false); - readTest(fs, false); - seekTest(fs, false); - fs.delete(CONTROL_DIR, true); - fs.delete(DATA_DIR, true); - fs.delete(WRITE_DIR, true); - fs.delete(READ_DIR, true); - } - - public static void testCommandFormat() throws Exception { - // This should go to TestFsShell.java when it is added. - CommandFormat cf; - cf= new CommandFormat("copyToLocal", 2,2,"crc","ignoreCrc"); - assertEquals(cf.parse(new String[] {"-get","file", "-"}, 1).get(1), "-"); - assertEquals(cf.parse(new String[] {"-get","file","-ignoreCrc","/foo"}, 1).get(1),"/foo"); - cf = new CommandFormat("tail", 1, 1, "f"); - assertEquals(cf.parse(new String[] {"-tail","fileName"}, 1).get(0),"fileName"); - assertEquals(cf.parse(new String[] {"-tail","-f","fileName"}, 1).get(0),"fileName"); - cf = new CommandFormat("setrep", 2, 2, "R", "w"); - assertEquals(cf.parse(new String[] {"-setrep","-R","2","/foo/bar"}, 1).get(1), "/foo/bar"); - cf = new CommandFormat("put", 2, 10000); - assertEquals(cf.parse(new String[] {"-put", "-", "dest"}, 1).get(1), "dest"); - } - - public static void createControlFile(FileSystem fs, - long megaBytes, int numFiles, - long seed) throws Exception { - - LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files"); - - Path controlFile = new Path(CONTROL_DIR, "files"); - fs.delete(controlFile, true); - Random random = new Random(seed); - - SequenceFile.Writer writer = - SequenceFile.createWriter(fs, conf, controlFile, - Text.class, LongWritable.class, CompressionType.NONE); - - long totalSize = 0; - long maxSize = ((megaBytes / numFiles) * 2) + 1; - try { - while (totalSize < megaBytes) { - Text name = new Text(Long.toString(random.nextLong())); - - long size = random.nextLong(); - if (size < 0) - size = -size; - size = size % maxSize; - - //LOG.info(" adding: name="+name+" size="+size); - - writer.append(name, new LongWritable(size)); - - totalSize += size; - } - } finally { - writer.close(); - } - LOG.info("created control file for: "+totalSize+" bytes"); - } - - public static class WriteMapper extends Configured - implements Mapper { - - private Random random = new Random(); - private byte[] buffer = new byte[BUFFER_SIZE]; - private FileSystem fs; - private boolean fastCheck; - - // a random suffix per task - private String suffix = "-"+random.nextLong(); - - { - try { - fs = FileSystem.get(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public WriteMapper() { super(null); } - - public WriteMapper(Configuration conf) { super(conf); } - - public void configure(JobConf job) { - setConf(job); - fastCheck = job.getBoolean("fs.test.fastCheck", false); - } - - public void map(Text key, LongWritable value, - OutputCollector collector, - Reporter reporter) - throws IOException { - - String name = key.toString(); - long size = value.get(); - long seed = Long.parseLong(name); - - random.setSeed(seed); - reporter.setStatus("creating " + name); - - // write to temp file initially to permit parallel execution - Path tempFile = new Path(DATA_DIR, name+suffix); - OutputStream out = fs.create(tempFile); - - long written = 0; - try { - while (written < size) { - if (fastCheck) { - Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE)); - } else { - random.nextBytes(buffer); - } - long remains = size - written; - int length = (remains<=buffer.length) ? (int)remains : buffer.length; - out.write(buffer, 0, length); - written += length; - reporter.setStatus("writing "+name+"@"+written+"/"+size); - } - } finally { - out.close(); - } - // rename to final location - fs.rename(tempFile, new Path(DATA_DIR, name)); - - collector.collect(new Text("bytes"), new LongWritable(written)); - - reporter.setStatus("wrote " + name); - } - - public void close() { - } - - } - - public static void writeTest(FileSystem fs, boolean fastCheck) - throws Exception { - - fs.delete(DATA_DIR, true); - fs.delete(WRITE_DIR, true); - - JobConf job = new JobConf(conf, TestFileSystem.class); - job.setBoolean("fs.test.fastCheck", fastCheck); - - FileInputFormat.setInputPaths(job, CONTROL_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(WriteMapper.class); - job.setReducerClass(LongSumReducer.class); - - FileOutputFormat.setOutputPath(job, WRITE_DIR); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - public static class ReadMapper extends Configured - implements Mapper { - - private Random random = new Random(); - private byte[] buffer = new byte[BUFFER_SIZE]; - private byte[] check = new byte[BUFFER_SIZE]; - private FileSystem fs; - private boolean fastCheck; - - { - try { - fs = FileSystem.get(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public ReadMapper() { super(null); } - - public ReadMapper(Configuration conf) { super(conf); } - - public void configure(JobConf job) { - setConf(job); - fastCheck = job.getBoolean("fs.test.fastCheck", false); - } - - public void map(Text key, LongWritable value, - OutputCollector collector, - Reporter reporter) - throws IOException { - - String name = key.toString(); - long size = value.get(); - long seed = Long.parseLong(name); - - random.setSeed(seed); - reporter.setStatus("opening " + name); - - DataInputStream in = - new DataInputStream(fs.open(new Path(DATA_DIR, name))); - - long read = 0; - try { - while (read < size) { - long remains = size - read; - int n = (remains<=buffer.length) ? (int)remains : buffer.length; - in.readFully(buffer, 0, n); - read += n; - if (fastCheck) { - Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE)); - } else { - random.nextBytes(check); - } - if (n != buffer.length) { - Arrays.fill(buffer, n, buffer.length, (byte)0); - Arrays.fill(check, n, check.length, (byte)0); - } - assertTrue(Arrays.equals(buffer, check)); - - reporter.setStatus("reading "+name+"@"+read+"/"+size); - - } - } finally { - in.close(); - } - - collector.collect(new Text("bytes"), new LongWritable(read)); - - reporter.setStatus("read " + name); - } - - public void close() { - } - - } - - public static void readTest(FileSystem fs, boolean fastCheck) - throws Exception { - - fs.delete(READ_DIR, true); - - JobConf job = new JobConf(conf, TestFileSystem.class); - job.setBoolean("fs.test.fastCheck", fastCheck); - - - FileInputFormat.setInputPaths(job, CONTROL_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(ReadMapper.class); - job.setReducerClass(LongSumReducer.class); - - FileOutputFormat.setOutputPath(job, READ_DIR); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - - public static class SeekMapper extends Configured - implements Mapper { - - private Random random = new Random(); - private byte[] check = new byte[BUFFER_SIZE]; - private FileSystem fs; - private boolean fastCheck; - - { - try { - fs = FileSystem.get(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public SeekMapper() { super(null); } - - public SeekMapper(Configuration conf) { super(conf); } - - public void configure(JobConf job) { - setConf(job); - fastCheck = job.getBoolean("fs.test.fastCheck", false); - } - - public void map(Text key, LongWritable value, - OutputCollector collector, - Reporter reporter) - throws IOException { - String name = key.toString(); - long size = value.get(); - long seed = Long.parseLong(name); - - if (size == 0) return; - - reporter.setStatus("opening " + name); - - FSDataInputStream in = fs.open(new Path(DATA_DIR, name)); - - try { - for (int i = 0; i < SEEKS_PER_FILE; i++) { - // generate a random position - long position = Math.abs(random.nextLong()) % size; - - // seek file to that position - reporter.setStatus("seeking " + name); - in.seek(position); - byte b = in.readByte(); - - // check that byte matches - byte checkByte = 0; - // advance random state to that position - random.setSeed(seed); - for (int p = 0; p <= position; p+= check.length) { - reporter.setStatus("generating data for " + name); - if (fastCheck) { - checkByte = (byte)random.nextInt(Byte.MAX_VALUE); - } else { - random.nextBytes(check); - checkByte = check[(int)(position % check.length)]; - } - } - assertEquals(b, checkByte); - } - } finally { - in.close(); - } - } - - public void close() { - } - - } - - public static void seekTest(FileSystem fs, boolean fastCheck) - throws Exception { - - fs.delete(READ_DIR, true); - - JobConf job = new JobConf(conf, TestFileSystem.class); - job.setBoolean("fs.test.fastCheck", fastCheck); - - FileInputFormat.setInputPaths(job,CONTROL_DIR); - job.setInputFormat(SequenceFileInputFormat.class); - - job.setMapperClass(SeekMapper.class); - job.setReducerClass(LongSumReducer.class); - - FileOutputFormat.setOutputPath(job, READ_DIR); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - JobClient.runJob(job); - } - - - public static void main(String[] args) throws Exception { - int megaBytes = 10; - int files = 100; - boolean noRead = false; - boolean noWrite = false; - boolean noSeek = false; - boolean fastCheck = false; - long seed = new Random().nextLong(); - - String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]"; - - if (args.length == 0) { - System.err.println(usage); - System.exit(-1); - } - for (int i = 0; i < args.length; i++) { // parse command line - if (args[i].equals("-files")) { - files = Integer.parseInt(args[++i]); - } else if (args[i].equals("-megaBytes")) { - megaBytes = Integer.parseInt(args[++i]); - } else if (args[i].equals("-noread")) { - noRead = true; - } else if (args[i].equals("-nowrite")) { - noWrite = true; - } else if (args[i].equals("-noseek")) { - noSeek = true; - } else if (args[i].equals("-fastcheck")) { - fastCheck = true; - } - } - - LOG.info("seed = "+seed); - LOG.info("files = " + files); - LOG.info("megaBytes = " + megaBytes); - - FileSystem fs = FileSystem.get(conf); - - if (!noWrite) { - createControlFile(fs, megaBytes*MEGA, files, seed); - writeTest(fs, fastCheck); - } - if (!noRead) { - readTest(fs, fastCheck); - } - if (!noSeek) { - seekTest(fs, fastCheck); - } - } - - static Configuration createConf4Testing(String username) throws Exception { - Configuration conf = new Configuration(); - UnixUserGroupInformation.saveToConf(conf, - UnixUserGroupInformation.UGI_PROPERTY_NAME, - new UnixUserGroupInformation(username, new String[]{"group"})); - return conf; - } - - public void testFsCache() throws Exception { - { - long now = System.currentTimeMillis(); - Configuration[] conf = {new Configuration(), - createConf4Testing("foo" + now), createConf4Testing("bar" + now)}; - FileSystem[] fs = new FileSystem[conf.length]; - - for(int i = 0; i < conf.length; i++) { - fs[i] = FileSystem.get(conf[i]); - assertEquals(fs[i], FileSystem.get(conf[i])); - for(int j = 0; j < i; j++) { - assertFalse(fs[j] == fs[i]); - } - } - FileSystem.closeAll(); - } - - { - try { - runTestCache(NameNode.DEFAULT_PORT); - } catch(java.net.BindException be) { - LOG.warn("Cannot test NameNode.DEFAULT_PORT (=" - + NameNode.DEFAULT_PORT + ")", be); - } - - runTestCache(0); - } - } - - static void runTestCache(int port) throws Exception { - Configuration conf = new Configuration(); - MiniDFSCluster cluster = null; - try { - cluster = new MiniDFSCluster(port, conf, 2, true, true, null, null); - URI uri = cluster.getFileSystem().getUri(); - LOG.info("uri=" + uri); - - { - FileSystem fs = FileSystem.get(uri, new Configuration()); - checkPath(cluster, fs); - for(int i = 0; i < 100; i++) { - assertTrue(fs == FileSystem.get(uri, new Configuration())); - } - } - - if (port == NameNode.DEFAULT_PORT) { - //test explicit default port - URI uri2 = new URI(uri.getScheme(), uri.getUserInfo(), - uri.getHost(), NameNode.DEFAULT_PORT, uri.getPath(), - uri.getQuery(), uri.getFragment()); - LOG.info("uri2=" + uri2); - FileSystem fs = FileSystem.get(uri2, conf); - checkPath(cluster, fs); - for(int i = 0; i < 100; i++) { - assertTrue(fs == FileSystem.get(uri2, new Configuration())); - } - } - } finally { - if (cluster != null) cluster.shutdown(); - } - } - - static void checkPath(MiniDFSCluster cluster, FileSystem fileSys) throws IOException { - InetSocketAddress add = cluster.getNameNode().getNameNodeAddress(); - // Test upper/lower case - fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort())); - } - - public void testFsClose() throws Exception { - { - Configuration conf = new Configuration(); - new Path("file:///").getFileSystem(conf); - UnixUserGroupInformation.login(conf, true); - FileSystem.closeAll(); - } - - { - Configuration conf = new Configuration(); - new Path("hftp://localhost:12345/").getFileSystem(conf); - UnixUserGroupInformation.login(conf, true); - FileSystem.closeAll(); - } - - { - Configuration conf = new Configuration(); - FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf); - UnixUserGroupInformation.login(fs.getConf(), true); - FileSystem.closeAll(); - } - } - - - public void testCacheKeysAreCaseInsensitive() - throws Exception - { - Configuration conf = new Configuration(); - - // check basic equality - FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf); - FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf); - assertEquals( lowercaseCachekey1, lowercaseCachekey2 ); - - // check insensitive equality - FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HFTP://Localhost:12345/"), conf); - assertEquals( lowercaseCachekey2, uppercaseCachekey ); - - // check behaviour with collections - List list = new ArrayList(); - list.add(uppercaseCachekey); - assertTrue(list.contains(uppercaseCachekey)); - assertTrue(list.contains(lowercaseCachekey2)); - - Set set = new HashSet(); - set.add(uppercaseCachekey); - assertTrue(set.contains(uppercaseCachekey)); - assertTrue(set.contains(lowercaseCachekey2)); - - Map map = new HashMap(); - map.put(uppercaseCachekey, ""); - assertTrue(map.containsKey(uppercaseCachekey)); - assertTrue(map.containsKey(lowercaseCachekey2)); - - } - - public static void testFsUniqueness(long megaBytes, int numFiles, long seed) - throws Exception { - - // multiple invocations of FileSystem.get return the same object. - FileSystem fs1 = FileSystem.get(conf); - FileSystem fs2 = FileSystem.get(conf); - assertTrue(fs1 == fs2); - - // multiple invocations of FileSystem.newInstance return different objects - fs1 = FileSystem.newInstance(conf); - fs2 = FileSystem.newInstance(conf); - assertTrue(fs1 != fs2 && !fs1.equals(fs2)); - fs1.close(); - fs2.close(); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java b/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java deleted file mode 100644 index f9a10c88a3..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs; - -import java.io.IOException; -import java.util.Iterator; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.tools.HadoopArchives; -import org.apache.hadoop.util.ToolRunner; - -/** - * test the har file system - * create a har filesystem - * run fs commands - * and then run a map reduce job - */ -public class TestHarFileSystem extends TestCase { - private Path inputPath; - private MiniDFSCluster dfscluster; - private MiniMRCluster mapred; - private FileSystem fs; - private Path filea, fileb, filec; - private Path archivePath; - - protected void setUp() throws Exception { - super.setUp(); - dfscluster = new MiniDFSCluster(new JobConf(), 2, true, null); - fs = dfscluster.getFileSystem(); - mapred = new MiniMRCluster(2, fs.getUri().toString(), 1); - inputPath = new Path(fs.getHomeDirectory(), "test"); - filea = new Path(inputPath,"a"); - fileb = new Path(inputPath,"b"); - filec = new Path(inputPath,"c"); - archivePath = new Path(fs.getHomeDirectory(), "tmp"); - } - - protected void tearDown() throws Exception { - try { - if (mapred != null) { - mapred.shutdown(); - } - if (dfscluster != null) { - dfscluster.shutdown(); - } - } catch(Exception e) { - System.err.println(e); - } - super.tearDown(); - } - - static class TextMapperReducer implements Mapper, - Reducer { - - public void configure(JobConf conf) { - //do nothing - } - - public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { - output.collect(value, new Text("")); - } - - public void close() throws IOException { - // do nothing - } - - public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { - while(values.hasNext()) { - values.next(); - output.collect(key, null); - } - } - } - - public void testArchives() throws Exception { - fs.mkdirs(inputPath); - - FSDataOutputStream out = fs.create(filea); - out.write("a".getBytes()); - out.close(); - out = fs.create(fileb); - out.write("b".getBytes()); - out.close(); - out = fs.create(filec); - out.write("c".getBytes()); - out.close(); - Configuration conf = mapred.createJobConf(); - HadoopArchives har = new HadoopArchives(conf); - String[] args = new String[3]; - //check for destination not specfied - args[0] = "-archiveName"; - args[1] = "foo.har"; - args[2] = inputPath.toString(); - int ret = ToolRunner.run(har, args); - assertTrue(ret != 0); - args = new String[4]; - //check for wrong archiveName - args[0] = "-archiveName"; - args[1] = "/d/foo.har"; - args[2] = inputPath.toString(); - args[3] = archivePath.toString(); - ret = ToolRunner.run(har, args); - assertTrue(ret != 0); -// se if dest is a file - args[1] = "foo.har"; - args[3] = filec.toString(); - ret = ToolRunner.run(har, args); - assertTrue(ret != 0); - //this is a valid run - args[0] = "-archiveName"; - args[1] = "foo.har"; - args[2] = inputPath.toString(); - args[3] = archivePath.toString(); - ret = ToolRunner.run(har, args); - //checl for the existenece of the archive - assertTrue(ret == 0); - ///try running it again. it should not - // override the directory - ret = ToolRunner.run(har, args); - assertTrue(ret != 0); - Path finalPath = new Path(archivePath, "foo.har"); - Path fsPath = new Path(inputPath.toUri().getPath()); - String relative = fsPath.toString().substring(1); - Path filePath = new Path(finalPath, relative); - //make it a har path - Path harPath = new Path("har://" + filePath.toUri().getPath()); - assertTrue(fs.exists(new Path(finalPath, "_index"))); - assertTrue(fs.exists(new Path(finalPath, "_masterindex"))); - assertTrue(!fs.exists(new Path(finalPath, "_logs"))); - //creation tested - //check if the archive is same - // do ls and cat on all the files - FsShell shell = new FsShell(conf); - args = new String[2]; - args[0] = "-ls"; - args[1] = harPath.toString(); - ret = ToolRunner.run(shell, args); - // ls should work. - assertTrue((ret == 0)); - //now check for contents of filea - // fileb and filec - Path harFilea = new Path(harPath, "a"); - Path harFileb = new Path(harPath, "b"); - Path harFilec = new Path(harPath, "c"); - FileSystem harFs = harFilea.getFileSystem(conf); - FSDataInputStream fin = harFs.open(harFilea); - byte[] b = new byte[4]; - int readBytes = fin.read(b); - assertTrue("Empty read.", readBytes > 0); - fin.close(); - assertTrue("strings are equal ", (b[0] == "a".getBytes()[0])); - fin = harFs.open(harFileb); - readBytes = fin.read(b); - assertTrue("Empty read.", readBytes > 0); - fin.close(); - assertTrue("strings are equal ", (b[0] == "b".getBytes()[0])); - fin = harFs.open(harFilec); - readBytes = fin.read(b); - assertTrue("Empty read.", readBytes > 0); - fin.close(); - assertTrue("strings are equal ", (b[0] == "c".getBytes()[0])); - // ok all files match - // run a map reduce job - Path outdir = new Path(fs.getHomeDirectory(), "mapout"); - JobConf jobconf = mapred.createJobConf(); - FileInputFormat.addInputPath(jobconf, harPath); - jobconf.setInputFormat(TextInputFormat.class); - jobconf.setOutputFormat(TextOutputFormat.class); - FileOutputFormat.setOutputPath(jobconf, outdir); - jobconf.setMapperClass(TextMapperReducer.class); - jobconf.setMapOutputKeyClass(Text.class); - jobconf.setMapOutputValueClass(Text.class); - jobconf.setReducerClass(TextMapperReducer.class); - jobconf.setNumReduceTasks(1); - JobClient.runJob(jobconf); - args[1] = outdir.toString(); - ret = ToolRunner.run(shell, args); - - FileStatus[] status = fs.globStatus(new Path(outdir, "part*")); - Path reduceFile = status[0].getPath(); - FSDataInputStream reduceIn = fs.open(reduceFile); - b = new byte[6]; - readBytes = reduceIn.read(b); - assertTrue("Should read 6 bytes.", readBytes == 6); - //assuming all the 6 bytes were read. - Text readTxt = new Text(b); - assertTrue("a\nb\nc\n".equals(readTxt.toString())); - assertTrue("number of bytes left should be -1", reduceIn.read(b) == -1); - reduceIn.close(); - } -} diff --git a/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java b/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java deleted file mode 100644 index 22b8679fb7..0000000000 --- a/src/test/hdfs-with-mr/org/apache/hadoop/hdfs/NNBench.java +++ /dev/null @@ -1,964 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.util.Date; -import java.io.DataInputStream; -import java.io.FileOutputStream; -import java.io.InputStreamReader; -import java.io.PrintStream; -import java.io.File; -import java.io.BufferedReader; -import java.util.StringTokenizer; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Iterator; - -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; - -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile.CompressionType; -import org.apache.hadoop.io.SequenceFile; - -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reducer; - -/** - * This program executes a specified operation that applies load to - * the NameNode. - * - * When run simultaneously on multiple nodes, this program functions - * as a stress-test and benchmark for namenode, especially when - * the number of bytes written to each file is small. - * - * Valid operations are: - * create_write - * open_read - * rename - * delete - * - * NOTE: The open_read, rename and delete operations assume that the files - * they operate on are already available. The create_write operation - * must be run before running the other operations. - */ - -public class NNBench { - private static final Log LOG = LogFactory.getLog( - "org.apache.hadoop.hdfs.NNBench"); - - protected static String CONTROL_DIR_NAME = "control"; - protected static String OUTPUT_DIR_NAME = "output"; - protected static String DATA_DIR_NAME = "data"; - protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log"; - protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4"; - - public static String operation = "none"; - public static long numberOfMaps = 1l; // default is 1 - public static long numberOfReduces = 1l; // default is 1 - public static long startTime = - System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min - public static long blockSize = 1l; // default is 1 - public static int bytesToWrite = 0; // default is 0 - public static long bytesPerChecksum = 1l; // default is 1 - public static long numberOfFiles = 1l; // default is 1 - public static short replicationFactorPerFile = 1; // default is 1 - public static String baseDir = "/benchmarks/NNBench"; // default - public static boolean readFileAfterOpen = false; // default is to not read - - // Supported operations - private static final String OP_CREATE_WRITE = "create_write"; - private static final String OP_OPEN_READ = "open_read"; - private static final String OP_RENAME = "rename"; - private static final String OP_DELETE = "delete"; - - // To display in the format that matches the NN and DN log format - // Example: 2007-10-26 00:01:19,853 - static SimpleDateFormat sdf = - new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S"); - - private static Configuration config = new Configuration(); - - /** - * Clean up the files before a test run - * - * @throws IOException on error - */ - private static void cleanupBeforeTestrun() throws IOException { - FileSystem tempFS = FileSystem.get(config); - - // Delete the data directory only if it is the create/write operation - if (operation.equals(OP_CREATE_WRITE)) { - LOG.info("Deleting data directory"); - tempFS.delete(new Path(baseDir, DATA_DIR_NAME), true); - } - tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME), true); - tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME), true); - } - - /** - * Create control files before a test run. - * Number of files created is equal to the number of maps specified - * - * @throws IOException on error - */ - private static void createControlFiles() throws IOException { - FileSystem tempFS = FileSystem.get(config); - LOG.info("Creating " + numberOfMaps + " control files"); - - for (int i = 0; i < numberOfMaps; i++) { - String strFileName = "NNBench_Controlfile_" + i; - Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME), - strFileName); - - SequenceFile.Writer writer = null; - try { - writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, - LongWritable.class, CompressionType.NONE); - writer.append(new Text(strFileName), new LongWritable(0l)); - } catch(Exception e) { - throw new IOException(e.getLocalizedMessage()); - } finally { - if (writer != null) { - writer.close(); - } - writer = null; - } - } - } - /** - * Display version - */ - private static void displayVersion() { - System.out.println(NNBENCH_VERSION); - } - - /** - * Display usage - */ - private static void displayUsage() { - String usage = - "Usage: nnbench \n" + - "Options:\n" + - "\t-operation \n" + - "\t * NOTE: The open_read, rename and delete operations assume " + - "that the files they operate on, are already available. " + - "The create_write operation must be run before running the " + - "other operations.\n" + - "\t-maps \n" + - "\t-reduces \n" + - "\t-startTime