MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause duplicate records (wilfreds via rkanter)

This commit is contained in:
Robert Kanter 2015-11-25 17:03:38 -08:00
parent e556c35b05
commit 7fd00b3db4
5 changed files with 361 additions and 123 deletions

View File

@ -333,6 +333,10 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
//appending the ambiguous characters (refer case 2.2) //appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount); str.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0; ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
} }
if (appendLength > 0) { if (appendLength > 0) {
str.append(buffer, startPosn, appendLength); str.append(buffer, startPosn, appendLength);
@ -380,4 +384,9 @@ protected int getBufferPosn() {
protected int getBufferSize() { protected int getBufferSize() {
return bufferSize; return bufferSize;
} }
protected void unsetNeedAdditionalRecordAfterSplit() {
// needed for custom multi byte line delimiters only
// see MAPREDUCE-6549 for details
}
} }

View File

@ -650,6 +650,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of MAPREDUCE-6557. Tests in mapreduce-client-app are writing outside of
target. (Akira AJISAKA via junping_du) target. (Akira AJISAKA via junping_du)
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
duplicate records (wilfreds via rkanter)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -97,4 +97,9 @@ public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
public boolean needAdditionalRecordAfterSplit() { public boolean needAdditionalRecordAfterSplit() {
return !finished && needAdditionalRecord; return !finished && needAdditionalRecord;
} }
@Override
protected void unsetNeedAdditionalRecordAfterSplit() {
needAdditionalRecord = false;
}
} }

View File

@ -334,12 +334,72 @@ private Path createInputFile(Configuration conf, String data)
@Test @Test
public void testUncompressedInput() throws Exception { public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++" // single char delimiter, best case
+ "jkl+++mno+++pqr+++stu+++vw +++xyz"; String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData); Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++"); conf.set("textinputformat.record.delimiter", "+");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter, best case
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// single char delimiter with empty records
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with empty records
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with starting part of the delimiter in the data
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+-");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline as start of the delimiter
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "\n+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline in delimiter and in data
inputData = "abc\ndef+\nghi+\njkl\nmno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+\n");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize); conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
} }
@ -363,80 +423,126 @@ public void testUncompressedInputContainingCRLF() throws Exception {
public void testUncompressedInputCustomDelimiterPosValue() public void testUncompressedInputCustomDelimiterPosValue()
throws Exception { throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10); conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input. conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab"; String inputData = "abcdefghij++kl++mno";
Path inputFile = createInputFile(conf, inputData);
String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); // the first split must contain two records to make sure that it also pulls
// in the record from the 2nd split
int splitLength = 15;
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
LineRecordReader reader = new LineRecordReader(conf, split, LineRecordReader reader = new LineRecordReader(conf, split,
recordDelimiterBytes); recordDelimiterBytes);
LongWritable key = new LongWritable(); LongWritable key = new LongWritable();
Text value = new Text(); Text value = new Text();
reader.next(key, value); // Get first record: "abcdefghij"
// Get first record:"1234567890" assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals(10, value.getLength()); assertEquals("Wrong length for record value", 10, value.getLength());
// Position should be 12 right after "1234567890ab" // Position should be 12 right after "abcdefghij++"
assertEquals(12, reader.getPos()); assertEquals("Wrong position after record read", 12, reader.getPos());
reader.next(key, value); // Get second record: "kl"
// Get second record:"12" assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals(2, value.getLength()); assertEquals("Wrong length for record value", 2, value.getLength());
// Position should be 16 right after "1234567890ab12ab" // Position should be 16 right after "abcdefghij++kl++"
assertEquals(16, reader.getPos()); assertEquals("Wrong position after record read", 16, reader.getPos());
reader.next(key, value); // Get third record: "mno"
// Get third record:"345" assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals(3, value.getLength()); assertEquals("Wrong length for record value", 3, value.getLength());
// Position should be 19 right after "1234567890ab12ab345" // Position should be 19 right after "abcdefghij++kl++mno"
assertEquals(19, reader.getPos()); assertEquals("Wrong position after record read", 19, reader.getPos());
assertFalse(reader.next(key, value)); assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos()); assertEquals("Wrong position after record read", 19, reader.getPos());
reader.close();
split = new FileSplit(inputFile, 15, 4, (String[])null); // No record is in the second split because the second split will drop
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// No record is in the second split because the second split dropped
// the first record, which was already reported by the first split. // the first record, which was already reported by the first split.
// The position should be 19 right after "1234567890ab12ab345" split = new FileSplit(inputFile, splitLength,
assertEquals(19, reader.getPos()); inputData.length() - splitLength, (String[]) null);
assertFalse(reader.next(key, value));
assertEquals(19, reader.getPos());
inputData = "123456789aab";
inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes); reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value); // The position should be 19 right after "abcdefghij++kl++mno" and should
// Get first record:"123456789a" // not change
assertEquals(10, value.getLength()); assertEquals("Wrong position after record read", 19, reader.getPos());
// Position should be 12 right after "123456789aab" assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals(12, reader.getPos()); assertEquals("Wrong position after record read", 19, reader.getPos());
assertFalse(reader.next(key, value)); reader.close();
assertEquals(12, reader.getPos());
inputData = "123456789a"; // multi char delimiter with starting part of the delimiter in the data
inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData); inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 10, (String[])null); splitLength = 5;
split = new FileSplit(inputFile, 0, splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes); reader = new LineRecordReader(conf, split, recordDelimiterBytes);
reader.next(key, value); // Get first record: "abcd+efgh"
// Get first record:"123456789a" assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals(10, value.getLength()); assertEquals("Wrong position after record read", 11, reader.getPos());
// Position should be 10 right after "123456789a" assertEquals("Wrong length for record value", 9, value.getLength());
assertEquals(10, reader.getPos()); // should have jumped over the delimiter, no record
assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals("Wrong position after record read", 11, reader.getPos());
reader.close();
// next split: check for duplicate or dropped records
split = new FileSplit(inputFile, splitLength,
inputData.length() - splitLength, (String[]) null);
reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// Get second record: "ijk" first in this split
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong position after record read", 16, reader.getPos());
assertEquals("Wrong length for record value", 3, value.getLength());
// Get third record: "mno" second in this split
assertTrue("Expected record got nothing", reader.next(key, value));
assertEquals("Wrong position after record read", 19, reader.getPos());
assertEquals("Wrong length for record value", 3, value.getLength());
// should be at the end of the input
assertFalse(reader.next(key, value)); assertFalse(reader.next(key, value));
assertEquals(10, reader.getPos()); assertEquals("Wrong position after record read", 19, reader.getPos());
reader.close();
inputData = "123456789ab"; inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData); inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null); delimiter = "|+|";
reader = new LineRecordReader(conf, split, recordDelimiterBytes); recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
reader.next(key, value); // walking over the buffer and split sizes checks for proper processing
// Get first record:"123456789" // of the ambiguous bytes of the delimiter
assertEquals(9, value.getLength()); for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
// Position should be 11 right after "123456789ab" for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
assertEquals(11, reader.getPos()); conf.setInt("io.file.buffer.size", bufferSize);
assertFalse(reader.next(key, value)); split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
assertEquals(11, reader.getPos()); reader = new LineRecordReader(conf, split, recordDelimiterBytes);
// Get first record: "abcd|efgh" always possible
assertTrue("Expected record got nothing", reader.next(key, value));
assertTrue("abcd|efgh".equals(value.toString()));
assertEquals("Wrong position after record read", 9, value.getLength());
// Position should be 12 right after "|+|"
int recordPos = 12;
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
// get the next record: "ij|kl" if the split/buffer allows it
if (reader.next(key, value)) {
// check the record info: "ij|kl"
assertTrue("ij|kl".equals(value.toString()));
// Position should be 20 right after "|+|"
recordPos = 20;
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
}
// get the third record: "mno|pqr" if the split/buffer allows it
if (reader.next(key, value)) {
// check the record info: "mno|pqr"
assertTrue("mno|pqr".equals(value.toString()));
// Position should be 27 at the end of the string now
recordPos = inputData.length();
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
}
// no more records can be read we should still be at the last position
assertFalse("Unexpected record returned", reader.next(key, value));
assertEquals("Wrong position after record read", recordPos,
reader.getPos());
reader.close();
}
}
} }
@Test @Test

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
@ -320,16 +321,76 @@ private Path createInputFile(Configuration conf, String data)
@Test @Test
public void testUncompressedInput() throws Exception { public void testUncompressedInput() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String inputData = "abc+++def+++ghi+++" // single char delimiter, best case
+ "jkl+++mno+++pqr+++stu+++vw +++xyz"; String inputData = "abc+def+ghi+jkl+mno+pqr+stu+vw +xyz";
Path inputFile = createInputFile(conf, inputData); Path inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+++"); conf.set("textinputformat.record.delimiter", "+");
for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) { for(int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for(int splitSize = 1; splitSize < inputData.length(); splitSize++) { for(int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize); conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile); testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
} }
} }
// multi char delimiter, best case
inputData = "abc|+|def|+|ghi|+|jkl|+|mno|+|pqr|+|stu|+|vw |+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// single char delimiter with empty records
inputData = "abc+def++ghi+jkl++mno+pqr++stu+vw ++xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with empty records
inputData = "abc|+||+|defghi|+|jkl|+||+|mno|+|pqr|+||+|stu|+|vw |+||+|xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "|+|");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with starting part of the delimiter in the data
inputData = "abc+def+-ghi+jkl+-mno+pqr+-stu+vw +-xyz";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+-");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline as start of the delimiter
inputData = "abc\n+def\n+ghi\n+jkl\n+mno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "\n+");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
// multi char delimiter with newline in delimiter and in data
inputData = "abc\ndef+\nghi+\njkl\nmno";
inputFile = createInputFile(conf, inputData);
conf.set("textinputformat.record.delimiter", "+\n");
for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
conf.setInt("io.file.buffer.size", bufferSize);
testSplitRecordsForFile(conf, splitSize, inputData.length(), inputFile);
}
}
} }
@Test @Test
@ -349,91 +410,145 @@ public void testUncompressedInputContainingCRLF() throws Exception {
public void testUncompressedInputCustomDelimiterPosValue() public void testUncompressedInputCustomDelimiterPosValue()
throws Exception { throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String inputData = "1234567890ab12ab345";
Path inputFile = createInputFile(conf, inputData);
conf.setInt("io.file.buffer.size", 10); conf.setInt("io.file.buffer.size", 10);
conf.setInt(org.apache.hadoop.mapreduce.lib.input. conf.setInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
String delimiter = "ab"; String inputData = "abcdefghij++kl++mno";
Path inputFile = createInputFile(conf, inputData);
String delimiter = "++";
byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); byte[] recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
FileSplit split = new FileSplit(inputFile, 0, 15, (String[])null); int splitLength = 15;
FileSplit split = new FileSplit(inputFile, 0, splitLength, (String[])null);
TaskAttemptContext context = new TaskAttemptContextImpl(conf, TaskAttemptContext context = new TaskAttemptContextImpl(conf,
new TaskAttemptID()); new TaskAttemptID());
LineRecordReader reader = new LineRecordReader(recordDelimiterBytes); LineRecordReader reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
LongWritable key; // Get first record: "abcdefghij"
Text value; assertTrue("Expected record got nothing", reader.nextKeyValue());
reader.nextKeyValue(); LongWritable key = reader.getCurrentKey();
key = reader.getCurrentKey(); Text value = reader.getCurrentValue();
value = reader.getCurrentValue(); assertEquals("Wrong length for record value", 10, value.getLength());
// Get first record:"1234567890" assertEquals("Wrong position after record read", 0, key.get());
assertEquals(10, value.getLength()); // Get second record: "kl"
assertEquals(0, key.get()); assertTrue("Expected record got nothing", reader.nextKeyValue());
reader.nextKeyValue(); assertEquals("Wrong length for record value", 2, value.getLength());
// Get second record:"12" // Key should be 12 right after "abcdefghij++"
assertEquals(2, value.getLength()); assertEquals("Wrong position after record read", 12, key.get());
// Key should be 12 right after "1234567890ab" // Get third record: "mno"
assertEquals(12, key.get()); assertTrue("Expected record got nothing", reader.nextKeyValue());
reader.nextKeyValue(); assertEquals("Wrong length for record value", 3, value.getLength());
// Get third record:"345" // Key should be 16 right after "abcdefghij++kl++"
assertEquals(3, value.getLength()); assertEquals("Wrong position after record read", 16, key.get());
// Key should be 16 right after "1234567890ab12ab"
assertEquals(16, key.get());
assertFalse(reader.nextKeyValue()); assertFalse(reader.nextKeyValue());
// Key should be 19 right after "1234567890ab12ab345" // Key should be 19 right after "abcdefghij++kl++mno"
assertEquals(19, key.get()); assertEquals("Wrong position after record read", 19, key.get());
// after refresh should be empty
split = new FileSplit(inputFile, 15, 4, (String[])null); key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
split = new FileSplit(inputFile, splitLength,
inputData.length() - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes); reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
// No record is in the second split because the second split dropped // No record is in the second split because the second split dropped
// the first record, which was already reported by the first split. // the first record, which was already reported by the first split.
assertFalse(reader.nextKeyValue()); assertFalse("Unexpected record returned", reader.nextKeyValue());
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
inputData = "123456789aab"; // multi char delimiter with starting part of the delimiter in the data
inputData = "abcd+efgh++ijk++mno";
inputFile = createInputFile(conf, inputData); inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 12, (String[])null); splitLength = 5;
split = new FileSplit(inputFile, 0, splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes); reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
reader.nextKeyValue(); // Get first record: "abcd+efgh"
assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey(); key = reader.getCurrentKey();
value = reader.getCurrentValue(); value = reader.getCurrentValue();
// Get first record:"123456789a" assertEquals("Wrong position after record read", 0, key.get());
assertEquals(10, value.getLength()); assertEquals("Wrong length for record value", 9, value.getLength());
assertEquals(0, key.get()); // should have jumped over the delimiter, no record
assertFalse(reader.nextKeyValue()); assertFalse(reader.nextKeyValue());
// Key should be 12 right after "123456789aab" assertEquals("Wrong position after record read", 11, key.get());
assertEquals(12, key.get()); // after refresh should be empty
key = reader.getCurrentKey();
inputData = "123456789a"; assertNull("Unexpected key returned", key);
inputFile = createInputFile(conf, inputData); reader.close();
split = new FileSplit(inputFile, 0, 10, (String[])null); // next split: check for duplicate or dropped records
split = new FileSplit(inputFile, splitLength,
inputData.length () - splitLength, (String[])null);
reader = new LineRecordReader(recordDelimiterBytes); reader = new LineRecordReader(recordDelimiterBytes);
reader.initialize(split, context); reader.initialize(split, context);
reader.nextKeyValue(); assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey(); key = reader.getCurrentKey();
value = reader.getCurrentValue(); value = reader.getCurrentValue();
// Get first record:"123456789a" // Get second record: "ijk" first in this split
assertEquals(10, value.getLength()); assertEquals("Wrong position after record read", 11, key.get());
assertEquals(0, key.get()); assertEquals("Wrong length for record value", 3, value.getLength());
// Get third record: "mno" second in this split
assertTrue("Expected record got nothing", reader.nextKeyValue());
assertEquals("Wrong position after record read", 16, key.get());
assertEquals("Wrong length for record value", 3, value.getLength());
// should be at the end of the input
assertFalse(reader.nextKeyValue()); assertFalse(reader.nextKeyValue());
// Key should be 10 right after "123456789a" assertEquals("Wrong position after record read", 19, key.get());
assertEquals(10, key.get()); reader.close();
inputData = "123456789ab"; inputData = "abcd|efgh|+|ij|kl|+|mno|pqr";
inputFile = createInputFile(conf, inputData); inputFile = createInputFile(conf, inputData);
split = new FileSplit(inputFile, 0, 11, (String[])null); delimiter = "|+|";
reader = new LineRecordReader(recordDelimiterBytes); recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
reader.initialize(split, context); // walking over the buffer and split sizes checks for proper processing
reader.nextKeyValue(); // of the ambiguous bytes of the delimiter
key = reader.getCurrentKey(); for (int bufferSize = 1; bufferSize <= inputData.length(); bufferSize++) {
value = reader.getCurrentValue(); for (int splitSize = 1; splitSize < inputData.length(); splitSize++) {
// Get first record:"123456789" // track where we are in the inputdata
assertEquals(9, value.getLength()); int keyPosition = 0;
assertEquals(0, key.get()); conf.setInt("io.file.buffer.size", bufferSize);
assertFalse(reader.nextKeyValue()); split = new FileSplit(inputFile, 0, bufferSize, (String[]) null);
// Key should be 11 right after "123456789ab" reader = new LineRecordReader(recordDelimiterBytes);
assertEquals(11, key.get()); reader.initialize(split, context);
// Get the first record: "abcd|efgh" always possible
assertTrue("Expected record got nothing", reader.nextKeyValue());
key = reader.getCurrentKey();
value = reader.getCurrentValue();
assertTrue("abcd|efgh".equals(value.toString()));
// Position should be 0 right at the start
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be 12 right after the first "|+|"
keyPosition = 12;
// get the next record: "ij|kl" if the split/buffer allows it
if (reader.nextKeyValue()) {
// check the record info: "ij|kl"
assertTrue("ij|kl".equals(value.toString()));
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be 20 after the second "|+|"
keyPosition = 20;
}
// get the third record: "mno|pqr" if the split/buffer allows it
if (reader.nextKeyValue()) {
// check the record info: "mno|pqr"
assertTrue("mno|pqr".equals(value.toString()));
assertEquals("Wrong position after record read", keyPosition,
key.get());
// Position should be the end of the input
keyPosition = inputData.length();
}
assertFalse("Unexpected record returned", reader.nextKeyValue());
// no more records can be read we should be at the last position
assertEquals("Wrong position after record read", keyPosition,
key.get());
// after refresh should be empty
key = reader.getCurrentKey();
assertNull("Unexpected key returned", key);
reader.close();
}
}
} }
@Test @Test