From ef01768333ec0e59e7d747864183835e756a7bf6 Mon Sep 17 00:00:00 2001 From: yliu Date: Sun, 8 Feb 2015 02:43:43 +0800 Subject: [PATCH] MAPREDUCE-6227. DFSIO for truncate. (shv via yliu) --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../java/org/apache/hadoop/fs/TestDFSIO.java | 91 +++++++++++++++++-- 2 files changed, 84 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5dc083a988..583c6c166c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -274,6 +274,8 @@ Release 2.7.0 - UNRELEASED MAPREDUCE-5800. Use Job#getInstance instead of deprecated constructors (aajisaka) + MAPREDUCE-6227. DFSIO for truncate. (shv via yliu) + OPTIMIZATIONS MAPREDUCE-6169. MergeQueue should release reference to the current item diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 78f1ffa684..d9cd07ba05 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -31,7 +31,6 @@ import java.util.Date; import java.util.Random; import java.util.StringTokenizer; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -92,13 +91,13 @@ public class TestDFSIO implements Tool { private static final String BASE_FILE_NAME = "test_io_"; private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log"; private static final long MEGA = ByteMultiple.MB.value(); - private static final int DEFAULT_NR_BYTES = 1; + private static final int DEFAULT_NR_BYTES = 128; private static final int DEFAULT_NR_FILES = 4; private static final String USAGE = "Usage: " + TestDFSIO.class.getSimpleName() + " [genericOptions]" + " -read [-random | -backward | -skip [-skipSize Size]] |" + - " -write | -append | -clean" + + " -write | -append | -truncate | -clean" + " [-compression codecClassName]" + " [-nrFiles N]" + " [-size Size[B|KB|MB|GB|TB]]" + @@ -120,7 +119,8 @@ private static enum TestType { TEST_TYPE_APPEND("append"), TEST_TYPE_READ_RANDOM("random read"), TEST_TYPE_READ_BACKWARD("backward read"), - TEST_TYPE_READ_SKIP("skip read"); + TEST_TYPE_READ_SKIP("skip read"), + TEST_TYPE_TRUNCATE("truncate"); private String type; @@ -191,6 +191,9 @@ private static Path getAppendDir(Configuration conf) { private static Path getRandomReadDir(Configuration conf) { return new Path(getBaseDir(conf), "io_random_read"); } + private static Path getTruncateDir(Configuration conf) { + return new Path(getBaseDir(conf), "io_truncate"); + } private static Path getDataDir(Configuration conf) { return new Path(getBaseDir(conf), "io_data"); } @@ -201,6 +204,7 @@ private static Path getDataDir(Configuration conf) { @BeforeClass public static void beforeClass() throws Exception { bench = new TestDFSIO(); + bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); cluster = new MiniDFSCluster.Builder(bench.getConf()) .numDataNodes(2) .format(true) @@ -277,6 +281,16 @@ public void testAppend() throws Exception { bench.analyzeResult(fs, TestType.TEST_TYPE_APPEND, execTime); } + @Test (timeout = 60000) + public void testTruncate() throws Exception { + FileSystem fs = cluster.getFileSystem(); + bench.createControlFile(fs, DEFAULT_NR_BYTES / 2, DEFAULT_NR_FILES); + long tStart = System.currentTimeMillis(); + bench.truncateTest(fs); + long execTime = System.currentTimeMillis() - tStart; + bench.analyzeResult(fs, TestType.TEST_TYPE_TRUNCATE, execTime); + } + @SuppressWarnings("deprecation") private void createControlFile(FileSystem fs, long nrBytes, // in bytes @@ -299,9 +313,9 @@ private void createControlFile(FileSystem fs, } catch(Exception e) { throw new IOException(e.getLocalizedMessage()); } finally { - if (writer != null) + if (writer != null) writer.close(); - writer = null; + writer = null; } } LOG.info("created control files for: "+nrFiles+" files"); @@ -611,6 +625,51 @@ private void randomReadTest(FileSystem fs) throws IOException { runIOTest(RandomReadMapper.class, readDir); } + /** + * Truncate mapper class. + * The mapper truncates given file to the newLength, specified by -size. + */ + public static class TruncateMapper extends IOStatMapper { + private static final long DELAY = 100L; + + private Path filePath; + private long fileSize; + + @Override // IOMapperBase + public Closeable getIOStream(String name) throws IOException { + filePath = new Path(getDataDir(getConf()), name); + fileSize = fs.getFileStatus(filePath).getLen(); + return null; + } + + @Override // IOMapperBase + public Long doIO(Reporter reporter, + String name, + long newLength // in bytes + ) throws IOException { + boolean isClosed = fs.truncate(filePath, newLength); + reporter.setStatus("truncating " + name + " to newLength " + + newLength + " ::host = " + hostName); + for(int i = 0; !isClosed; i++) { + try { + Thread.sleep(DELAY); + } catch (InterruptedException ignored) {} + FileStatus status = fs.getFileStatus(filePath); + assert status != null : "status is null"; + isClosed = (status.getLen() == newLength); + reporter.setStatus("truncate recover for " + name + " to newLength " + + newLength + " attempt " + i + " ::host = " + hostName); + } + return Long.valueOf(fileSize - newLength); + } + } + + private void truncateTest(FileSystem fs) throws IOException { + Path TruncateDir = getTruncateDir(config); + fs.delete(TruncateDir, true); + runIOTest(TruncateMapper.class, TruncateDir); + } + private void sequentialTest(FileSystem fs, TestType testType, long fileSize, // in bytes @@ -632,6 +691,9 @@ private void sequentialTest(FileSystem fs, case TEST_TYPE_READ_SKIP: ioer = new RandomReadMapper(); break; + case TEST_TYPE_TRUNCATE: + ioer = new TruncateMapper(); + break; default: return; } @@ -665,7 +727,7 @@ public int run(String[] args) throws IOException { String resFileName = DEFAULT_RES_FILE_NAME; String compressionClass = null; boolean isSequential = false; - String version = TestDFSIO.class.getSimpleName() + ".1.7"; + String version = TestDFSIO.class.getSimpleName() + ".1.8"; LOG.info(version); if (args.length == 0) { @@ -689,6 +751,8 @@ public int run(String[] args) throws IOException { } else if (args[i].equalsIgnoreCase("-skip")) { if (testType != TestType.TEST_TYPE_READ) return -1; testType = TestType.TEST_TYPE_READ_SKIP; + } else if (args[i].equalsIgnoreCase("-truncate")) { + testType = TestType.TEST_TYPE_TRUNCATE; } else if (args[i].equalsIgnoreCase("-clean")) { testType = TestType.TEST_TYPE_CLEANUP; } else if (args[i].toLowerCase().startsWith("-seq")) { @@ -762,6 +826,11 @@ else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) case TEST_TYPE_READ_BACKWARD: case TEST_TYPE_READ_SKIP: randomReadTest(fs); + break; + case TEST_TYPE_TRUNCATE: + truncateTest(fs); + break; + default: } long execTime = System.currentTimeMillis() - tStart; @@ -797,7 +866,7 @@ static float toMB(long bytes) { return ((float)bytes)/MEGA; } - private void analyzeResult( FileSystem fs, + private void analyzeResult( FileSystem fs, TestType testType, long execTime, String resFileName @@ -870,13 +939,17 @@ private Path getReduceFilePath(TestType testType) { case TEST_TYPE_READ_BACKWARD: case TEST_TYPE_READ_SKIP: return new Path(getRandomReadDir(config), "part-00000"); + case TEST_TYPE_TRUNCATE: + return new Path(getTruncateDir(config), "part-00000"); + default: } return null; } private void analyzeResult(FileSystem fs, TestType testType, long execTime) throws IOException { - analyzeResult(fs, testType, execTime, DEFAULT_RES_FILE_NAME); + String dir = System.getProperty("test.build.dir", "target/test-dir"); + analyzeResult(fs, testType, execTime, dir + "/" + DEFAULT_RES_FILE_NAME); } private void cleanup(FileSystem fs)