HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat. Contributed by Ahmed Radwan.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1068729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-02-09 01:41:08 +00:00
parent a805223b4e
commit 223f651118
2 changed files with 124 additions and 4 deletions

View File

@ -13,6 +13,9 @@ Trunk (unreleased changes)
HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling
via hairong) via hairong)
HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat
(Ahmed Radwan via todd)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7042. Updates to test-patch.sh to include failed test names and HADOOP-7042. Updates to test-patch.sh to include failed test names and

View File

@ -28,6 +28,14 @@
/** /**
* A class that provides a line reader from an input stream. * A class that provides a line reader from an input stream.
* Depending on the constructor used, lines will either be terminated by:
* <ul>
* <li>one of the following: '\n' (LF) , '\r' (CR),
* or '\r\n' (CR+LF).</li>
* <li><em>or</em>, a custom byte sequence delimiter</li>
* </ul>
* In both cases, EOF also terminates an otherwise unterminated
* line.
*/ */
@InterfaceAudience.LimitedPrivate({"MapReduce"}) @InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -44,6 +52,9 @@ public class LineReader {
private static final byte CR = '\r'; private static final byte CR = '\r';
private static final byte LF = '\n'; private static final byte LF = '\n';
// The line delimiter
private final byte[] recordDelimiterBytes;
/** /**
* Create a line reader that reads from the given stream using the * Create a line reader that reads from the given stream using the
* default buffer-size (64k). * default buffer-size (64k).
@ -65,6 +76,7 @@ public LineReader(InputStream in, int bufferSize) {
this.in = in; this.in = in;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize]; this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = null;
} }
/** /**
@ -79,6 +91,56 @@ public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
} }
/**
* Create a line reader that reads from the given stream using the
* default buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param recordDelimiterBytes The delimiter
*/
public LineReader(InputStream in, byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = DEFAULT_BUFFER_SIZE;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* given buffer-size, and using a custom delimiter of array of
* bytes.
* @param in The input stream
* @param bufferSize Size of the read buffer
* @param recordDelimiterBytes The delimiter
* @throws IOException
*/
public LineReader(InputStream in, int bufferSize,
byte[] recordDelimiterBytes) {
this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/**
* Create a line reader that reads from the given stream using the
* <code>io.file.buffer.size</code> specified in the given
* <code>Configuration</code>, and using a custom delimiter of array of
* bytes.
* @param in input stream
* @param conf configuration
* @param recordDelimiterBytes The delimiter
* @throws IOException
*/
public LineReader(InputStream in, Configuration conf,
byte[] recordDelimiterBytes) throws IOException {
this.in = in;
this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
this.buffer = new byte[this.bufferSize];
this.recordDelimiterBytes = recordDelimiterBytes;
}
/** /**
* Close the underlying stream. * Close the underlying stream.
* @throws IOException * @throws IOException
@ -88,10 +150,7 @@ public void close() throws IOException {
} }
/** /**
* Read one line from the InputStream into the given Text. A line * Read one line from the InputStream into the given Text.
* can be terminated by one of the following: '\n' (LF) , '\r' (CR),
* or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
* line.
* *
* @param str the object to store the given line (without newline) * @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str; * @param maxLineLength the maximum number of bytes to store into str;
@ -108,6 +167,18 @@ public void close() throws IOException {
*/ */
public int readLine(Text str, int maxLineLength, public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException { int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
/**
* Read a line terminated by one of CR, LF, or CRLF.
*/
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be /* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases: * already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy * 1. No newline characters are in the buffer, so we need to copy
@ -170,6 +241,52 @@ public int readLine(Text str, int maxLineLength,
return (int)bytesConsumed; return (int)bytesConsumed;
} }
/**
* Read a line terminated by a custom delimiter.
*/
private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
str.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
do {
int startPosn = bufferPosn; // starting from where we left off the last
// time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = in.read(buffer);
if (bufferLength <= 0)
break; // EOF
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
delPosn++;
if (delPosn >= recordDelimiterBytes.length) {
bufferPosn++;
break;
}
} else {
delPosn = 0;
}
}
int readLength = bufferPosn - startPosn;
bytesConsumed += readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long) Integer.MAX_VALUE)
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
return (int) bytesConsumed;
}
/** /**
* Read from the InputStream into the given Text. * Read from the InputStream into the given Text.
* @param str the object to store the given line * @param str the object to store the given line