diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java index 900215af4a..153953d273 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LineReader.java @@ -333,6 +333,10 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) //appending the ambiguous characters (refer case 2.2) str.append(recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; + // since it is now certain that the split did not split a delimiter we + // should not read the next record: clear the flag otherwise duplicate + // records could be generated + unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); @@ -380,4 +384,9 @@ protected int getBufferPosn() { protected int getBufferSize() { return bufferSize; } + + protected void unsetNeedAdditionalRecordAfterSplit() { + // needed for custom multi byte line delimiters only + // see MAPREDUCE-6549 for details + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c6e80e7b47..503e687179 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -650,6 +650,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of target. (Akira AJISAKA via junping_du) + MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause + duplicate records (wilfreds via rkanter) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java index 38491b0b0b..6d495ef3da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/UncompressedSplitLineReader.java @@ -97,4 +97,9 @@ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) public boolean needAdditionalRecordAfterSplit() { return !finished && needAdditionalRecord; } + + @Override + protected void unsetNeedAdditionalRecordAfterSplit() { + needAdditionalRecord = false; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index d33a614207..f0cf9f5a54 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -334,12 +334,72 @@ private Path createInputFile(Configuration conf, String data) @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration(); - String inputData = "abc+++def+++ghi+++" - + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + // single char delimiter, best case + String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz"; Path inputFile = createInputFile(conf, inputData); - conf.set("textinputformat.record.delimiter", "+++"); - for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { - for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter, best case + inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // single char delimiter with empty records + inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with empty records + inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with starting part of the delimiter in the data + inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+-"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline as start of the delimiter + inputData = "abc\n+def\n+ghi\n+jkl\n+mno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "\n+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline in delimiter and in data + inputData = "abc\ndef+\nghi+\njkl\nmno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+\n"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { conf.setInt("io.file.buffer.size", bufferSize); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); } @@ -363,80 +423,126 @@ public void testUncompressedInputContainingCRLF() throws Exception { public void testUncompressedInputCustomDelimiterPosValue() throws Exception { Configuration conf = new Configuration(); - String inputData = "1234567890ab12ab345"; - Path inputFile = createInputFile(conf, inputData); conf.setInt("io.file.buffer.size", 10); conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - String delimiter = "ab"; + String inputData = "abcdefghij++kl++mno"; + Path inputFile = createInputFile(conf, inputData); + String delimiter = "++"; byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); - FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + // the first split must contain two records to make sure that it also pulls + // in the record from the 2nd split + int splitLength = 15; + FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null); LineRecordReader reader = new LineRecordReader(conf, split, recordDelimiterBytes); LongWritable key = new LongWritable(); Text value = new Text(); - reader.next(key, value); - // Get first record:"1234567890" - assertEquals(10, value.getLength()); - // Position should be 12 right after "1234567890ab" - assertEquals(12, reader.getPos()); - reader.next(key, value); - // Get second record:"12" - assertEquals(2, value.getLength()); - // Position should be 16 right after "1234567890ab12ab" - assertEquals(16, reader.getPos()); - reader.next(key, value); - // Get third record:"345" - assertEquals(3, value.getLength()); - // Position should be 19 right after "1234567890ab12ab345" - assertEquals(19, reader.getPos()); + // Get first record: "abcdefghij" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 10, value.getLength()); + // Position should be 12 right after "abcdefghij++" + assertEquals("Wrong position after record read", 12, reader.getPos()); + // Get second record: "kl" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 2, value.getLength()); + // Position should be 16 right after "abcdefghij++kl++" + assertEquals("Wrong position after record read", 16, reader.getPos()); + // Get third record: "mno" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Position should be 19 right after "abcdefghij++kl++mno" + assertEquals("Wrong position after record read", 19, reader.getPos()); assertFalse(reader.next(key, value)); - assertEquals(19, reader.getPos()); - - split = new FileSplit(inputFile, 15, 4, (String[])null); - reader = new LineRecordReader(conf, split, recordDelimiterBytes); - // No record is in the second split because the second split dropped + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); + // No record is in the second split because the second split will drop // the first record, which was already reported by the first split. - // The position should be 19 right after "1234567890ab12ab345" - assertEquals(19, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(19, reader.getPos()); - - inputData = "123456789aab"; - inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 12, (String[])null); + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[]) null); reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - // Position should be 12 right after "123456789aab" - assertEquals(12, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(12, reader.getPos()); + // The position should be 19 right after "abcdefghij++kl++mno" and should + // not change + assertEquals("Wrong position after record read", 19, reader.getPos()); + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); - inputData = "123456789a"; + // multi char delimiter with starting part of the delimiter in the data + inputData = "abcd+efgh++ijk++mno"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 10, (String[])null); + splitLength = 5; + split = new FileSplit(inputFile, 0, splitLength, (String[]) null); reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - // Position should be 10 right after "123456789a" - assertEquals(10, reader.getPos()); + // Get first record: "abcd+efgh" + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 11, reader.getPos()); + assertEquals("Wrong length for record value", 9, value.getLength()); + // should have jumped over the delimiter, no record + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", 11, reader.getPos()); + reader.close(); + // next split: check for duplicate or dropped records + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[]) null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // Get second record: "ijk" first in this split + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 16, reader.getPos()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Get third record: "mno" second in this split + assertTrue("Expected record got nothing", reader.next(key, value)); + assertEquals("Wrong position after record read", 19, reader.getPos()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // should be at the end of the input assertFalse(reader.next(key, value)); - assertEquals(10, reader.getPos()); + assertEquals("Wrong position after record read", 19, reader.getPos()); + reader.close(); - inputData = "123456789ab"; + inputData = "abcd|efgh|+|ij|kl|+|mno|pqr"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 11, (String[])null); - reader = new LineRecordReader(conf, split, recordDelimiterBytes); - reader.next(key, value); - // Get first record:"123456789" - assertEquals(9, value.getLength()); - // Position should be 11 right after "123456789ab" - assertEquals(11, reader.getPos()); - assertFalse(reader.next(key, value)); - assertEquals(11, reader.getPos()); + delimiter = "|+|"; + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + // walking over the buffer and split sizes checks for proper processing + // of the ambiguous bytes of the delimiter + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + split = new FileSplit(inputFile, 0, bufferSize, (String[]) null); + reader = new LineRecordReader(conf, split, recordDelimiterBytes); + // Get first record: "abcd|efgh" always possible + assertTrue("Expected record got nothing", reader.next(key, value)); + assertTrue("abcd|efgh".equals(value.toString())); + assertEquals("Wrong position after record read", 9, value.getLength()); + // Position should be 12 right after "|+|" + int recordPos = 12; + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + // get the next record: "ij|kl" if the split/buffer allows it + if (reader.next(key, value)) { + // check the record info: "ij|kl" + assertTrue("ij|kl".equals(value.toString())); + // Position should be 20 right after "|+|" + recordPos = 20; + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + } + // get the third record: "mno|pqr" if the split/buffer allows it + if (reader.next(key, value)) { + // check the record info: "mno|pqr" + assertTrue("mno|pqr".equals(value.toString())); + // Position should be 27 at the end of the string now + recordPos = inputData.length(); + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + } + // no more records can be read we should still be at the last position + assertFalse("Unexpected record returned", reader.next(key, value)); + assertEquals("Wrong position after record read", recordPos, + reader.getPos()); + reader.close(); + } + } } @Test diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index dfe8b5d0d8..6819af7e97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; @@ -320,16 +321,76 @@ private Path createInputFile(Configuration conf, String data) @Test public void testUncompressedInput() throws Exception { Configuration conf = new Configuration(); - String inputData = "abc+++def+++ghi+++" - + "jkl+++mno+++pqr+++stu+++vw +++xyz"; + // single char delimiter, best case + String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz"; Path inputFile = createInputFile(conf, inputData); - conf.set("textinputformat.record.delimiter", "+++"); + conf.set("textinputformat.record.delimiter", "+"); for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { conf.setInt("io.file.buffer.size", bufferSize); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); } } + // multi char delimiter, best case + inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // single char delimiter with empty records + inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with empty records + inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "|+|"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with starting part of the delimiter in the data + inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+-"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline as start of the delimiter + inputData = "abc\n+def\n+ghi\n+jkl\n+mno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "\n+"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } + // multi char delimiter with newline in delimiter and in data + inputData = "abc\ndef+\nghi+\njkl\nmno"; + inputFile = createInputFile(conf, inputData); + conf.set("textinputformat.record.delimiter", "+\n"); + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + conf.setInt("io.file.buffer.size", bufferSize); + testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); + } + } } @Test @@ -349,91 +410,145 @@ public void testUncompressedInputContainingCRLF() throws Exception { public void testUncompressedInputCustomDelimiterPosValue() throws Exception { Configuration conf = new Configuration(); - String inputData = "1234567890ab12ab345"; - Path inputFile = createInputFile(conf, inputData); conf.setInt("io.file.buffer.size", 10); conf.setInt(org.apache.hadoop.mapreduce.lib.input. LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); - String delimiter = "ab"; + String inputData = "abcdefghij++kl++mno"; + Path inputFile = createInputFile(conf, inputData); + String delimiter = "++"; byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); - FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); + int splitLength = 15; + FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null); TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - LongWritable key; - Text value; - reader.nextKeyValue(); - key = reader.getCurrentKey(); - value = reader.getCurrentValue(); - // Get first record:"1234567890" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); - reader.nextKeyValue(); - // Get second record:"12" - assertEquals(2, value.getLength()); - // Key should be 12 right after "1234567890ab" - assertEquals(12, key.get()); - reader.nextKeyValue(); - // Get third record:"345" - assertEquals(3, value.getLength()); - // Key should be 16 right after "1234567890ab12ab" - assertEquals(16, key.get()); + // Get first record: "abcdefghij" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + LongWritable key = reader.getCurrentKey(); + Text value = reader.getCurrentValue(); + assertEquals("Wrong length for record value", 10, value.getLength()); + assertEquals("Wrong position after record read", 0, key.get()); + // Get second record: "kl" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong length for record value", 2, value.getLength()); + // Key should be 12 right after "abcdefghij++" + assertEquals("Wrong position after record read", 12, key.get()); + // Get third record: "mno" + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Key should be 16 right after "abcdefghij++kl++" + assertEquals("Wrong position after record read", 16, key.get()); assertFalse(reader.nextKeyValue()); - // Key should be 19 right after "1234567890ab12ab345" - assertEquals(19, key.get()); - - split = new FileSplit(inputFile, 15, 4, (String[])null); + // Key should be 19 right after "abcdefghij++kl++mno" + assertEquals("Wrong position after record read", 19, key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + split = new FileSplit(inputFile, splitLength, + inputData.length() - splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); // No record is in the second split because the second split dropped // the first record, which was already reported by the first split. - assertFalse(reader.nextKeyValue()); + assertFalse("Unexpected record returned", reader.nextKeyValue()); + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); - inputData = "123456789aab"; + // multi char delimiter with starting part of the delimiter in the data + inputData = "abcd+efgh++ijk++mno"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 12, (String[])null); + splitLength = 5; + split = new FileSplit(inputFile, 0, splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - reader.nextKeyValue(); + // Get first record: "abcd+efgh" + assertTrue("Expected record got nothing", reader.nextKeyValue()); key = reader.getCurrentKey(); value = reader.getCurrentValue(); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); + assertEquals("Wrong position after record read", 0, key.get()); + assertEquals("Wrong length for record value", 9, value.getLength()); + // should have jumped over the delimiter, no record assertFalse(reader.nextKeyValue()); - // Key should be 12 right after "123456789aab" - assertEquals(12, key.get()); - - inputData = "123456789a"; - inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 10, (String[])null); + assertEquals("Wrong position after record read", 11, key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + // next split: check for duplicate or dropped records + split = new FileSplit(inputFile, splitLength, + inputData.length () - splitLength, (String[])null); reader = new LineRecordReader(recordDelimiterBytes); reader.initialize(split, context); - reader.nextKeyValue(); + assertTrue("Expected record got nothing", reader.nextKeyValue()); key = reader.getCurrentKey(); value = reader.getCurrentValue(); - // Get first record:"123456789a" - assertEquals(10, value.getLength()); - assertEquals(0, key.get()); + // Get second record: "ijk" first in this split + assertEquals("Wrong position after record read", 11, key.get()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // Get third record: "mno" second in this split + assertTrue("Expected record got nothing", reader.nextKeyValue()); + assertEquals("Wrong position after record read", 16, key.get()); + assertEquals("Wrong length for record value", 3, value.getLength()); + // should be at the end of the input assertFalse(reader.nextKeyValue()); - // Key should be 10 right after "123456789a" - assertEquals(10, key.get()); + assertEquals("Wrong position after record read", 19, key.get()); + reader.close(); - inputData = "123456789ab"; + inputData = "abcd|efgh|+|ij|kl|+|mno|pqr"; inputFile = createInputFile(conf, inputData); - split = new FileSplit(inputFile, 0, 11, (String[])null); - reader = new LineRecordReader(recordDelimiterBytes); - reader.initialize(split, context); - reader.nextKeyValue(); - key = reader.getCurrentKey(); - value = reader.getCurrentValue(); - // Get first record:"123456789" - assertEquals(9, value.getLength()); - assertEquals(0, key.get()); - assertFalse(reader.nextKeyValue()); - // Key should be 11 right after "123456789ab" - assertEquals(11, key.get()); + delimiter = "|+|"; + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + // walking over the buffer and split sizes checks for proper processing + // of the ambiguous bytes of the delimiter + for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { + for (int splitSize = 1; splitSize < inputData.length(); splitSize++) { + // track where we are in the inputdata + int keyPosition = 0; + conf.setInt("io.file.buffer.size", bufferSize); + split = new FileSplit(inputFile, 0, bufferSize, (String[]) null); + reader = new LineRecordReader(recordDelimiterBytes); + reader.initialize(split, context); + // Get the first record: "abcd|efgh" always possible + assertTrue("Expected record got nothing", reader.nextKeyValue()); + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + assertTrue("abcd|efgh".equals(value.toString())); + // Position should be 0 right at the start + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be 12 right after the first "|+|" + keyPosition = 12; + // get the next record: "ij|kl" if the split/buffer allows it + if (reader.nextKeyValue()) { + // check the record info: "ij|kl" + assertTrue("ij|kl".equals(value.toString())); + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be 20 after the second "|+|" + keyPosition = 20; + } + // get the third record: "mno|pqr" if the split/buffer allows it + if (reader.nextKeyValue()) { + // check the record info: "mno|pqr" + assertTrue("mno|pqr".equals(value.toString())); + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // Position should be the end of the input + keyPosition = inputData.length(); + } + assertFalse("Unexpected record returned", reader.nextKeyValue()); + // no more records can be read we should be at the last position + assertEquals("Wrong position after record read", keyPosition, + key.get()); + // after refresh should be empty + key = reader.getCurrentKey(); + assertNull("Unexpected key returned", key); + reader.close(); + } + } } @Test