MAPREDUCE-5777. Support utf-8 text with Byte Order Marker. (Zhihai Xu via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1600977 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3d077726d0
commit
40ba8c17c1
@ -250,6 +250,9 @@ Release 2.5.0 - UNRELEASED
|
||||
MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog.
|
||||
(Kousuke Saruta via devaraj)
|
||||
|
||||
MAPREDUCE-5777. Support utf-8 text with Byte Order Marker.
|
||||
(Zhihai Xu via kasha)
|
||||
|
||||
Release 2.4.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -91,6 +91,7 @@
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
|
||||
<exclude>src/test/resources/testBOM.txt</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
@ -197,6 +197,39 @@ private long getFilePosition() throws IOException {
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private int skipUtfByteOrderMark(Text value) throws IOException {
|
||||
// Strip BOM(Byte Order Mark)
|
||||
// Text only support UTF-8, we only need to check UTF-8 BOM
|
||||
// (0xEF,0xBB,0xBF) at the start of the text stream.
|
||||
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
|
||||
Integer.MAX_VALUE);
|
||||
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
|
||||
// Even we read 3 extra bytes for the first line,
|
||||
// we won't alter existing behavior (no backwards incompat issue).
|
||||
// Because the newSize is less than maxLineLength and
|
||||
// the number of bytes copied to Text is always no more than newSize.
|
||||
// If the return size from readLine is not less than maxLineLength,
|
||||
// we will discard the current line and read the next line.
|
||||
pos += newSize;
|
||||
int textLength = value.getLength();
|
||||
byte[] textBytes = value.getBytes();
|
||||
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
|
||||
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
|
||||
// find UTF-8 BOM, strip it.
|
||||
LOG.info("Found UTF-8 BOM and skipped it");
|
||||
textLength -= 3;
|
||||
newSize -= 3;
|
||||
if (textLength > 0) {
|
||||
// It may work to use the same buffer and not do the copyBytes
|
||||
textBytes = value.copyBytes();
|
||||
value.set(textBytes, 3, textLength);
|
||||
} else {
|
||||
value.clear();
|
||||
}
|
||||
}
|
||||
return newSize;
|
||||
}
|
||||
|
||||
/** Read a line. */
|
||||
public synchronized boolean next(LongWritable key, Text value)
|
||||
throws IOException {
|
||||
@ -206,11 +239,17 @@ public synchronized boolean next(LongWritable key, Text value)
|
||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||
key.set(pos);
|
||||
|
||||
int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||
int newSize = 0;
|
||||
if (pos == 0) {
|
||||
newSize = skipUtfByteOrderMark(value);
|
||||
} else {
|
||||
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||
pos += newSize;
|
||||
}
|
||||
|
||||
if (newSize == 0) {
|
||||
return false;
|
||||
}
|
||||
pos += newSize;
|
||||
if (newSize < maxLineLength) {
|
||||
return true;
|
||||
}
|
||||
|
@ -134,6 +134,39 @@ private long getFilePosition() throws IOException {
|
||||
return retVal;
|
||||
}
|
||||
|
||||
private int skipUtfByteOrderMark() throws IOException {
|
||||
// Strip BOM(Byte Order Mark)
|
||||
// Text only support UTF-8, we only need to check UTF-8 BOM
|
||||
// (0xEF,0xBB,0xBF) at the start of the text stream.
|
||||
int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
|
||||
Integer.MAX_VALUE);
|
||||
int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
|
||||
// Even we read 3 extra bytes for the first line,
|
||||
// we won't alter existing behavior (no backwards incompat issue).
|
||||
// Because the newSize is less than maxLineLength and
|
||||
// the number of bytes copied to Text is always no more than newSize.
|
||||
// If the return size from readLine is not less than maxLineLength,
|
||||
// we will discard the current line and read the next line.
|
||||
pos += newSize;
|
||||
int textLength = value.getLength();
|
||||
byte[] textBytes = value.getBytes();
|
||||
if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
|
||||
(textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
|
||||
// find UTF-8 BOM, strip it.
|
||||
LOG.info("Found UTF-8 BOM and skipped it");
|
||||
textLength -= 3;
|
||||
newSize -= 3;
|
||||
if (textLength > 0) {
|
||||
// It may work to use the same buffer and not do the copyBytes
|
||||
textBytes = value.copyBytes();
|
||||
value.set(textBytes, 3, textLength);
|
||||
} else {
|
||||
value.clear();
|
||||
}
|
||||
}
|
||||
return newSize;
|
||||
}
|
||||
|
||||
public boolean nextKeyValue() throws IOException {
|
||||
if (key == null) {
|
||||
key = new LongWritable();
|
||||
@ -146,9 +179,14 @@ public boolean nextKeyValue() throws IOException {
|
||||
// We always read one extra line, which lies outside the upper
|
||||
// split limit i.e. (end - 1)
|
||||
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
|
||||
if (pos == 0) {
|
||||
newSize = skipUtfByteOrderMark();
|
||||
} else {
|
||||
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
|
||||
pos += newSize;
|
||||
if (newSize < maxLineLength) {
|
||||
}
|
||||
|
||||
if ((newSize == 0) || (newSize < maxLineLength)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -188,4 +188,41 @@ public void testRecordSpanningMultipleSplitsCompressed()
|
||||
checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
|
||||
200 * 1000, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStripBOM() throws IOException {
|
||||
// the test data contains a BOM at the start of the file
|
||||
// confirm the BOM is skipped by LineRecordReader
|
||||
String UTF8_BOM = "\uFEFF";
|
||||
URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
|
||||
assertNotNull("Cannot find testBOM.txt", testFileUrl);
|
||||
File testFile = new File(testFileUrl.getFile());
|
||||
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||
long testFileSize = testFile.length();
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||
|
||||
// read the data and check whether BOM is skipped
|
||||
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||
(String[])null);
|
||||
LineRecordReader reader = new LineRecordReader(conf, split);
|
||||
LongWritable key = new LongWritable();
|
||||
Text value = new Text();
|
||||
int numRecords = 0;
|
||||
boolean firstLine = true;
|
||||
boolean skipBOM = true;
|
||||
while (reader.next(key, value)) {
|
||||
if (firstLine) {
|
||||
firstLine = false;
|
||||
if (value.toString().startsWith(UTF8_BOM)) {
|
||||
skipBOM = false;
|
||||
}
|
||||
}
|
||||
++numRecords;
|
||||
}
|
||||
reader.close();
|
||||
|
||||
assertTrue("BOM is not skipped", skipBOM);
|
||||
}
|
||||
}
|
||||
|
@ -193,4 +193,42 @@ public void testRecordSpanningMultipleSplitsCompressed()
|
||||
200 * 1000,
|
||||
true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStripBOM() throws IOException {
|
||||
// the test data contains a BOM at the start of the file
|
||||
// confirm the BOM is skipped by LineRecordReader
|
||||
String UTF8_BOM = "\uFEFF";
|
||||
URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
|
||||
assertNotNull("Cannot find testBOM.txt", testFileUrl);
|
||||
File testFile = new File(testFileUrl.getFile());
|
||||
Path testFilePath = new Path(testFile.getAbsolutePath());
|
||||
long testFileSize = testFile.length();
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(org.apache.hadoop.mapreduce.lib.input.
|
||||
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
||||
|
||||
TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
|
||||
|
||||
// read the data and check whether BOM is skipped
|
||||
FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
|
||||
(String[])null);
|
||||
LineRecordReader reader = new LineRecordReader();
|
||||
reader.initialize(split, context);
|
||||
int numRecords = 0;
|
||||
boolean firstLine = true;
|
||||
boolean skipBOM = true;
|
||||
while (reader.nextKeyValue()) {
|
||||
if (firstLine) {
|
||||
firstLine = false;
|
||||
if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) {
|
||||
skipBOM = false;
|
||||
}
|
||||
}
|
||||
++numRecords;
|
||||
}
|
||||
reader.close();
|
||||
|
||||
assertTrue("BOM is not skipped", skipBOM);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,2 @@
|
||||
BOM(Byte Order Mark) test file
|
||||
BOM(Byte Order Mark) test file
|
Loading…
Reference in New Issue
Block a user