HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1376592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-08-23 16:58:36 +00:00
parent 42beb56a2e
commit 34bd9794f7
3 changed files with 179 additions and 30 deletions

View File

@ -832,6 +832,9 @@ Release 2.0.0-alpha - 05-23-2012
HADOOP-7868. Hadoop native fails to compile when default linker HADOOP-7868. Hadoop native fails to compile when default linker
option is -Wl,--as-needed. (Trevor Robinson via eli) option is -Wl,--as-needed. (Trevor Robinson via eli)
HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via
bobby)
Release 0.23.3 - UNRELEASED Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -204,11 +204,13 @@ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
int startPosn = bufferPosn; //starting from where we left off the last time int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) { if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0; startPosn = bufferPosn = 0;
if (prevCharCR) if (prevCharCR) {
++bytesConsumed; //account for CR from previous read ++bytesConsumed; //account for CR from previous read
}
bufferLength = in.read(buffer); bufferLength = in.read(buffer);
if (bufferLength <= 0) if (bufferLength <= 0) {
break; // EOF break; // EOF
}
} }
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) { if (buffer[bufferPosn] == LF) {
@ -223,8 +225,9 @@ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
prevCharCR = (buffer[bufferPosn] == CR); prevCharCR = (buffer[bufferPosn] == CR);
} }
int readLength = bufferPosn - startPosn; int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer --readLength; //CR at the end of the buffer
}
bytesConsumed += readLength; bytesConsumed += readLength;
int appendLength = readLength - newlineLength; int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) { if (appendLength > maxLineLength - txtLength) {
@ -236,8 +239,9 @@ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
} }
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long)Integer.MAX_VALUE) if (bytesConsumed > (long)Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed); throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed; return (int)bytesConsumed;
} }
@ -246,18 +250,56 @@ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
*/ */
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException { throws IOException {
/* We're reading data from inputStream, but the head of the stream may be
* already captured in the previous buffer, so we have several cases:
*
* 1. The buffer tail does not contain any character sequence which
* matches with the head of delimiter. We count it as a
* ambiguous byte count = 0
*
* 2. The buffer tail contains a X number of characters,
* that forms a sequence, which matches with the
* head of delimiter. We count ambiguous byte count = X
*
* // *** eg: A segment of input file is as follows
*
* " record 1792: I found this bug very interesting and
* I have completely read about it. record 1793: This bug
* can be solved easily record 1794: This ."
*
* delimiter = "record";
*
* supposing:- String at the end of buffer =
* "I found this bug very interesting and I have completely re"
* There for next buffer = "ad about it. record 179 ...."
*
* The matching characters in the input
* buffer tail and delimiter head = "re"
* Therefore, ambiguous byte count = 2 **** //
*
* 2.1 If the following bytes are the remaining characters of
* the delimiter, then we have to capture only up to the starting
* position of delimiter. That means, we need not include the
* ambiguous characters in str.
*
* 2.2 If the following bytes are not the remaining characters of
* the delimiter ( as mentioned in the example ),
* then we have to include the ambiguous characters in str.
*/
str.clear(); str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0; long bytesConsumed = 0;
int delPosn = 0; int delPosn = 0;
int ambiguousByteCount=0; // To capture the ambiguous characters count
do { do {
int startPosn = bufferPosn; // starting from where we left off the last int startPosn = bufferPosn; // Start from previous end position
// time
if (bufferPosn >= bufferLength) { if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0; startPosn = bufferPosn = 0;
bufferLength = in.read(buffer); bufferLength = in.read(buffer);
if (bufferLength <= 0) if (bufferLength <= 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
break; // EOF break; // EOF
}
} }
for (; bufferPosn < bufferLength; ++bufferPosn) { for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
@ -267,7 +309,7 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
break; break;
} }
} else if (delPosn != 0) { } else if (delPosn != 0) {
bufferPosn--; // recheck if bufferPosn matches start of delimiter bufferPosn--;
delPosn = 0; delPosn = 0;
} }
} }
@ -278,13 +320,26 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
appendLength = maxLineLength - txtLength; appendLength = maxLineLength - txtLength;
} }
if (appendLength > 0) { if (appendLength > 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
//appending the ambiguous characters (refer case 2.2)
bytesConsumed += ambiguousByteCount;
ambiguousByteCount=0;
}
str.append(buffer, startPosn, appendLength); str.append(buffer, startPosn, appendLength);
txtLength += appendLength; txtLength += appendLength;
} }
if (bufferPosn >= bufferLength) {
if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= ambiguousByteCount; //to be consumed in next
}
}
} while (delPosn < recordDelimiterBytes.length } while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume); && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long) Integer.MAX_VALUE) if (bytesConsumed > (long) Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed); throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
}
return (int) bytesConsumed; return (int) bytesConsumed;
} }
@ -297,7 +352,7 @@ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
*/ */
public int readLine(Text str, int maxLineLength) throws IOException { public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE); return readLine(str, maxLineLength, Integer.MAX_VALUE);
} }
/** /**
* Read from the InputStream into the given Text. * Read from the InputStream into the given Text.
@ -308,5 +363,4 @@ public int readLine(Text str, int maxLineLength) throws IOException {
public int readLine(Text str) throws IOException { public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
} }
} }

View File

@ -21,29 +21,121 @@
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import org.junit.Test; import org.junit.Test;
import junit.framework.Assert; import junit.framework.Assert;
public class TestLineReader { public class TestLineReader {
private LineReader lineReader;
private String TestData;
private String Delimiter;
private Text line;
@Test @Test
public void testCustomDelimiter() throws Exception { public void testCustomDelimiter() throws Exception {
String data = "record Bangalorrecord recorrecordrecord Kerala"; /* TEST_1
String delimiter = "record"; * The test scenario is the tail of the buffer
LineReader reader = new LineReader( * equals the starting character/s of delimiter
new ByteArrayInputStream(data.getBytes()), *
delimiter.getBytes()); * The Test Data is such that,
Text line = new Text(); *
reader.readLine(line); * 1) we will have "</entity>" as delimiter
Assert.assertEquals("", line.toString()); *
reader.readLine(line); * 2) The tail of the current buffer would be "</"
Assert.assertEquals(" Bangalor", line.toString()); * which matches with the starting character sequence of delimiter.
reader.readLine(line); *
Assert.assertEquals(" recor", line.toString()); * 3) The Head of the next buffer would be "id>"
reader.readLine(line); * which does NOT match with the remaining characters of delimiter.
Assert.assertEquals("", line.toString()); *
reader.readLine(line); * 4) Input data would be prefixed by char 'a'
Assert.assertEquals(" Kerala", line.toString()); * about numberOfCharToFillTheBuffer times.
* So that, one iteration to buffer the input data,
* would end at '</' ie equals starting 2 char of delimiter
*
* 5) For this we would take BufferSize as 64 * 1024;
*
* Check Condition
* In the second key value pair, the value should contain
* "</" from currentToken and
* "id>" from next token
*/
Delimiter="</entity>";
String CurrentBufferTailToken=
"</entity><entity><id>Gelesh</";
// Ending part of Input Data Buffer
// It contains '</' ie delimiter character
String NextBufferHeadToken=
"id><name>Omathil</name></entity>";
// Supposing the start of next buffer is this
String Expected =
(CurrentBufferTailToken+NextBufferHeadToken)
.replace(Delimiter, "");
// Expected ,must capture from both the buffer, excluding Delimiter
String TestPartOfInput = CurrentBufferTailToken+NextBufferHeadToken;
int BufferSize=64 * 1024;
int numberOfCharToFillTheBuffer=BufferSize-CurrentBufferTailToken.length();
StringBuilder fillerString=new StringBuilder();
for (int i=0;i<numberOfCharToFillTheBuffer;i++) {
fillerString.append('a'); // char 'a' as a filler for the test string
}
TestData = fillerString + TestPartOfInput;
lineReader = new LineReader(
new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
line = new Text();
lineReader.readLine(line);
Assert.assertEquals(fillerString.toString(),line.toString());
lineReader.readLine(line);
Assert.assertEquals(Expected, line.toString());
/*TEST_2
* The test scenario is such that,
* the character/s preceding the delimiter,
* equals the starting character/s of delimiter
*/
Delimiter = "record";
StringBuilder TestStringBuilder = new StringBuilder();
TestStringBuilder.append(Delimiter+"Kerala ");
TestStringBuilder.append(Delimiter+"Bangalore");
TestStringBuilder.append(Delimiter+" North Korea");
TestStringBuilder.append(Delimiter+Delimiter+
"Guantanamo");
TestStringBuilder.append(Delimiter+"ecord"+"recor"+"core"); //~EOF with 're'
TestData=TestStringBuilder.toString();
lineReader = new LineReader(
new ByteArrayInputStream(TestData.getBytes()),Delimiter.getBytes());
lineReader.readLine(line);
Assert.assertEquals("",line.toString());
lineReader.readLine(line);
Assert.assertEquals("Kerala ",line.toString());
lineReader.readLine(line);
Assert.assertEquals("Bangalore",line.toString());
lineReader.readLine(line);
Assert.assertEquals(" North Korea",line.toString());
lineReader.readLine(line);
Assert.assertEquals("",line.toString());
lineReader.readLine(line);
Assert.assertEquals("Guantanamo",line.toString());
lineReader.readLine(line);
Assert.assertEquals(("ecord"+"recor"+"core"),line.toString());
} }
} }