From e8f12457c145da7c7d2de173eb1f0beed3c5676f Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Mon, 25 Jul 2011 21:42:04 +0000 Subject: [PATCH] MAPREDUCE-2602. Allow setting of end-of-record delimiter for TextInputFormat for the old API. Contributed by Ahmed Radwan. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1150926 13f79535-47bb-0310-9956-ffa450edef68 --- mapreduce/CHANGES.txt | 3 + .../hadoop/mapred/LineRecordReader.java | 43 ++++-- .../apache/hadoop/mapred/TextInputFormat.java | 6 +- .../hadoop/mapred/TestLineRecordReader.java | 139 ++++++++++++++++++ 4 files changed, 181 insertions(+), 10 deletions(-) create mode 100644 mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLineRecordReader.java diff --git a/mapreduce/CHANGES.txt b/mapreduce/CHANGES.txt index 54f039df1c..ef9aff0aa7 100644 --- a/mapreduce/CHANGES.txt +++ b/mapreduce/CHANGES.txt @@ -203,6 +203,9 @@ Trunk (unreleased changes) MAPREDUCE-2623. Update ClusterMapReduceTestCase to use MiniDFSCluster.Builder (Harsh J Chouraria via eli) + MAPREDUCE-2602. Allow setting of end-of-record delimiter for + TextInputFormat for the old API. (Ahmed Radwan via todd) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/LineRecordReader.java b/mapreduce/src/java/org/apache/hadoop/mapred/LineRecordReader.java index b9506df44d..0da0f75090 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -77,10 +77,25 @@ public static class LineReader extends org.apache.hadoop.util.LineReader { public LineReader(InputStream in, Configuration conf) throws IOException { super(in, conf); } + LineReader(InputStream in, byte[] recordDelimiter) { + super(in, recordDelimiter); + } + LineReader(InputStream in, int bufferSize, byte[] recordDelimiter) { + super(in, bufferSize, recordDelimiter); + } + public LineReader(InputStream in, Configuration conf, + byte[] recordDelimiter) throws IOException { + super(in, conf, recordDelimiter); + } } public LineRecordReader(Configuration job, FileSplit split) throws IOException { + this(job, split, null); + } + + public LineRecordReader(Configuration job, FileSplit split, + byte[] recordDelimiter) throws IOException { this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); @@ -99,17 +114,17 @@ public LineRecordReader(Configuration job, ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); - in = new LineReader(cIn, job); + in = new LineReader(cIn, job, recordDelimiter); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; // take pos from compressed stream } else { - in = new LineReader(codec.createInputStream(fileIn, decompressor), job); + in = new LineReader(codec.createInputStream(fileIn, decompressor), job, recordDelimiter); filePosition = fileIn; } } else { fileIn.seek(start); - in = new LineReader(fileIn, job); + in = new LineReader(fileIn, job, recordDelimiter); filePosition = fileIn; } // If this is not the first split, we always throw away first record @@ -120,29 +135,40 @@ public LineRecordReader(Configuration job, } this.pos = start; } - + public LineRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) { + this(in, offset, endOffset, maxLineLength, null); + } + + public LineRecordReader(InputStream in, long offset, long endOffset, + int maxLineLength, byte[] recordDelimiter) { this.maxLineLength = maxLineLength; - this.in = new LineReader(in); + this.in = new LineReader(in, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; filePosition = null; } + public LineRecordReader(InputStream in, long offset, long endOffset, + Configuration job) + throws IOException{ + this(in, offset, endOffset, job, null); + } + public LineRecordReader(InputStream in, long offset, long endOffset, - Configuration job) + Configuration job, byte[] recordDelimiter) throws IOException{ this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - this.in = new LineReader(in, job); + this.in = new LineReader(in, job, recordDelimiter); this.start = offset; this.pos = offset; this.end = endOffset; filePosition = null; } - + public LongWritable createKey() { return new LongWritable(); } @@ -171,7 +197,6 @@ private long getFilePosition() throws IOException { return retVal; } - /** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { diff --git a/mapreduce/src/java/org/apache/hadoop/mapred/TextInputFormat.java b/mapreduce/src/java/org/apache/hadoop/mapred/TextInputFormat.java index 2ab61ec6a2..d90a0748be 100644 --- a/mapreduce/src/java/org/apache/hadoop/mapred/TextInputFormat.java +++ b/mapreduce/src/java/org/apache/hadoop/mapred/TextInputFormat.java @@ -59,6 +59,10 @@ public RecordReader getRecordReader( throws IOException { reporter.setStatus(genericSplit.toString()); - return new LineRecordReader(job, (FileSplit) genericSplit); + String delimiter = job.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(); + return new LineRecordReader(job, (FileSplit) genericSplit, + recordDelimiterBytes); } } diff --git a/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLineRecordReader.java b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLineRecordReader.java new file mode 100644 index 0000000000..472da68c8a --- /dev/null +++ b/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapred.lib.IdentityReducer; +import org.apache.tools.ant.util.FileUtils; +import org.junit.Test; + +public class TestLineRecordReader extends TestCase { + + private static Path workDir = new Path(new Path(System.getProperty( + "test.build.data", "."), "data"), "TestTextInputFormat"); + private static Path inputDir = new Path(workDir, "input"); + private static Path outputDir = new Path(workDir, "output"); + + /** + * Writes the input test file + * + * @param conf + * @throws IOException + */ + public void createInputFile(Configuration conf) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(inputDir, "test.txt"); + Writer writer = new OutputStreamWriter(localFs.create(file)); + writer.write("abc\ndef\t\nghi\njkl"); + writer.close(); + } + + /** + * Reads the output file into a string + * + * @param conf + * @return + * @throws IOException + */ + public String readOutputFile(Configuration conf) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + Path file = new Path(outputDir, "part-00000"); + Reader reader = new InputStreamReader(localFs.open(file)); + String r = FileUtils.readFully(reader); + reader.close(); + return r; + } + + /** + * Creates and runs an MR job + * + * @param conf + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + public void createAndRunJob(Configuration conf) throws IOException, + InterruptedException, ClassNotFoundException { + JobConf job = new JobConf(conf); + job.setJarByClass(TestLineRecordReader.class); + job.setMapperClass(IdentityMapper.class); + job.setReducerClass(IdentityReducer.class); + FileInputFormat.addInputPath(job, inputDir); + FileOutputFormat.setOutputPath(job, outputDir); + JobClient.runJob(job); + } + + /** + * Test the case when a custom record delimiter is specified using the + * textinputformat.record.delimiter configuration property + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + @Test + public void testCustomRecordDelimiters() throws IOException, + InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(); + conf.set("textinputformat.record.delimiter", "\t\n"); + FileSystem localFs = FileSystem.getLocal(conf); + // cleanup + localFs.delete(workDir, true); + // creating input test file + createInputFile(conf); + createAndRunJob(conf); + String expected = "0\tabc\ndef\n9\tghi\njkl\n"; + this.assertEquals(expected, readOutputFile(conf)); + } + + /** + * Test the default behavior when the textinputformat.record.delimiter + * configuration property is not specified + * + * @throws IOException + * @throws InterruptedException + * @throws ClassNotFoundException + */ + @Test + public void testDefaultRecordDelimiters() throws IOException, + InterruptedException, ClassNotFoundException { + Configuration conf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(conf); + // cleanup + localFs.delete(workDir, true); + // creating input test file + createInputFile(conf); + createAndRunJob(conf); + String expected = "0\tabc\n4\tdef\t\n9\tghi\n13\tjkl\n"; + this.assertEquals(expected, readOutputFile(conf)); + } + +} \ No newline at end of file