diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 8de17269cf..dfa82fb98b 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -257,6 +257,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5246. Specify application-type at the time of job submission after YARN-563. (Mayank Bansal via vinodkv) + MAPREDUCE-5230. Bring back NLineInputFormat.createFileSplit for binary + compatibility with mapred in 1.x (Mayank Bansal via vinodkv) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java index 7f4a2e5bcc..0245897b07 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; @@ -90,4 +91,21 @@ public InputSplit[] getSplits(JobConf job, int numSplits) public void configure(JobConf conf) { N = conf.getInt("mapreduce.input.lineinputformat.linespermap", 1); } + + /** + * NLineInputFormat uses LineRecordReader, which always reads + * (and consumes) at least one character out of its upper split + * boundary. So to make sure that each mapper gets N lines, we + * move back the upper split limits of each split + * by one character here. + * @param fileName Path of file + * @param begin the position of the first byte in the file to process + * @param length number of bytes in InputSplit + * @return FileSplit + */ + protected static FileSplit createFileSplit(Path fileName, long begin, long length) { + return (begin == 0) + ? new FileSplit(fileName, begin, length - 1, new String[] {}) + : new FileSplit(fileName, begin - 1, length, new String[] {}); + } }