From 40ba8c17c1500703c47a154f06708e5924c24e65 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 6 Jun 2014 18:37:03 +0000 Subject: [PATCH] MAPREDUCE-5777. Support utf-8 text with Byte Order Marker. (Zhihai Xu via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1600977 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop-mapreduce-client-core/pom.xml | 1 + .../hadoop/mapred/LineRecordReader.java | 43 +++++++++++++++++- .../mapreduce/lib/input/LineRecordReader.java | 44 +++++++++++++++++-- .../hadoop/mapred/TestLineRecordReader.java | 37 ++++++++++++++++ .../lib/input/TestLineRecordReader.java | 38 ++++++++++++++++ .../src/test/resources/testBOM.txt | 2 + 7 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/testBOM.txt diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9d7f3ad737..79ba4d0ede 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -250,6 +250,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5895. Close streams properly to avoid leakage in TaskLog. (Kousuke Saruta via devaraj) + MAPREDUCE-5777. Support utf-8 text with Byte Order Marker. + (Zhihai Xu via kasha) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml index 3ca6dc75e1..6e9d543f71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml @@ -91,6 +91,7 @@ src/test/resources/recordSpanningMultipleSplits.txt + src/test/resources/testBOM.txt diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index 8b26fbd16d..6b5c26ee47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -197,6 +197,39 @@ private long getFilePosition() throws IOException { return retVal; } + private int skipUtfByteOrderMark(Text value) throws IOException { + // Strip BOM(Byte Order Mark) + // Text only support UTF-8, we only need to check UTF-8 BOM + // (0xEF,0xBB,0xBF) at the start of the text stream. + int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, + Integer.MAX_VALUE); + int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); + // Even we read 3 extra bytes for the first line, + // we won't alter existing behavior (no backwards incompat issue). + // Because the newSize is less than maxLineLength and + // the number of bytes copied to Text is always no more than newSize. + // If the return size from readLine is not less than maxLineLength, + // we will discard the current line and read the next line. + pos += newSize; + int textLength = value.getLength(); + byte[] textBytes = value.getBytes(); + if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && + (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { + // find UTF-8 BOM, strip it. + LOG.info("Found UTF-8 BOM and skipped it"); + textLength -= 3; + newSize -= 3; + if (textLength > 0) { + // It may work to use the same buffer and not do the copyBytes + textBytes = value.copyBytes(); + value.set(textBytes, 3, textLength); + } else { + value.clear(); + } + } + return newSize; + } + /** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { @@ -206,11 +239,17 @@ public synchronized boolean next(LongWritable key, Text value) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); + int newSize = 0; + if (pos == 0) { + newSize = skipUtfByteOrderMark(value); + } else { + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); + pos += newSize; + } + if (newSize == 0) { return false; } - pos += newSize; if (newSize < maxLineLength) { return true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java index 6d12d82d17..880a1a2194 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java @@ -134,6 +134,39 @@ private long getFilePosition() throws IOException { return retVal; } + private int skipUtfByteOrderMark() throws IOException { + // Strip BOM(Byte Order Mark) + // Text only support UTF-8, we only need to check UTF-8 BOM + // (0xEF,0xBB,0xBF) at the start of the text stream. + int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, + Integer.MAX_VALUE); + int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); + // Even we read 3 extra bytes for the first line, + // we won't alter existing behavior (no backwards incompat issue). + // Because the newSize is less than maxLineLength and + // the number of bytes copied to Text is always no more than newSize. + // If the return size from readLine is not less than maxLineLength, + // we will discard the current line and read the next line. + pos += newSize; + int textLength = value.getLength(); + byte[] textBytes = value.getBytes(); + if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && + (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { + // find UTF-8 BOM, strip it. + LOG.info("Found UTF-8 BOM and skipped it"); + textLength -= 3; + newSize -= 3; + if (textLength > 0) { + // It may work to use the same buffer and not do the copyBytes + textBytes = value.copyBytes(); + value.set(textBytes, 3, textLength); + } else { + value.clear(); + } + } + return newSize; + } + public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); @@ -146,9 +179,14 @@ public boolean nextKeyValue() throws IOException { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { - newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); - pos += newSize; - if (newSize < maxLineLength) { + if (pos == 0) { + newSize = skipUtfByteOrderMark(); + } else { + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); + pos += newSize; + } + + if ((newSize == 0) || (newSize < maxLineLength)) { break; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java index ee066e218f..7b664e93ac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java @@ -188,4 +188,41 @@ public void testRecordSpanningMultipleSplitsCompressed() checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", 200 * 1000, true); } + + @Test + public void testStripBOM() throws IOException { + // the test data contains a BOM at the start of the file + // confirm the BOM is skipped by LineRecordReader + String UTF8_BOM = "\uFEFF"; + URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt"); + assertNotNull("Cannot find testBOM.txt", testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + // read the data and check whether BOM is skipped + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split); + LongWritable key = new LongWritable(); + Text value = new Text(); + int numRecords = 0; + boolean firstLine = true; + boolean skipBOM = true; + while (reader.next(key, value)) { + if (firstLine) { + firstLine = false; + if (value.toString().startsWith(UTF8_BOM)) { + skipBOM = false; + } + } + ++numRecords; + } + reader.close(); + + assertTrue("BOM is not skipped", skipBOM); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java index 46091850ea..a1b5147c0c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java @@ -193,4 +193,42 @@ public void testRecordSpanningMultipleSplitsCompressed() 200 * 1000, true); } + + @Test + public void testStripBOM() throws IOException { + // the test data contains a BOM at the start of the file + // confirm the BOM is skipped by LineRecordReader + String UTF8_BOM = "\uFEFF"; + URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt"); + assertNotNull("Cannot find testBOM.txt", testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // read the data and check whether BOM is skipped + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + int numRecords = 0; + boolean firstLine = true; + boolean skipBOM = true; + while (reader.nextKeyValue()) { + if (firstLine) { + firstLine = false; + if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) { + skipBOM = false; + } + } + ++numRecords; + } + reader.close(); + + assertTrue("BOM is not skipped", skipBOM); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/testBOM.txt b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/testBOM.txt new file mode 100644 index 0000000000..561f454987 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/resources/testBOM.txt @@ -0,0 +1,2 @@ +BOM(Byte Order Mark) test file +BOM(Byte Order Mark) test file \ No newline at end of file