diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java index d56822f0c8..51db0b3f0a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/MapFile.java @@ -811,15 +811,40 @@ public static long fix(FileSystem fs, Path dir, (LongWritable.class)); } try { - long pos = 0L; + /** What's the position (in bytes) we wrote when we got the last index */ + long lastIndexPos = -1; + /** + * What was size when we last wrote an index. Set to MIN_VALUE to ensure + * that we have an index at position zero - midKey will throw an exception + * if this is not the case + */ + long lastIndexKeyCount = Long.MIN_VALUE; + long pos = dataReader.getPosition(); LongWritable position = new LongWritable(); + long nextBlock = pos; + boolean blockCompressed = dataReader.isBlockCompressed(); while(dataReader.next(key, value)) { - cnt++; - if (cnt % indexInterval == 0) { - position.set(pos); - if (!dryrun) indexWriter.append(key, position); + if (blockCompressed) { + long curPos = dataReader.getPosition(); + if (curPos > nextBlock) { + pos = nextBlock; // current block position + nextBlock = curPos; + } } - pos = dataReader.getPosition(); + // Follow the same logic as in + // {@link MapFile.Writer#append(WritableComparable, Writable)} + if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) { + position.set(pos); + if (!dryrun) { + indexWriter.append(key, position); + } + lastIndexPos = pos; + lastIndexKeyCount = cnt; + } + if (!blockCompressed) { + pos = dataReader.getPosition(); // next record position + } + cnt++; } } catch(Throwable t) { // truncated data file. swallow it. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java index ff8df7cf06..7ec422732e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMapFile.java @@ -485,6 +485,63 @@ public void testFix() { IOUtils.cleanup(null, writer); } } + + /** + * test {@link MapFile#fix(FileSystem, Path, Class, + * Class, boolean, Configuration)} + * method in case of BLOCK compression + */ + @Test + public void testFixBlockCompress() throws Exception { + final String indexLessMapFile = "testFixBlockCompress.mapfile"; + final int compressBlocksize = 100; + final int indexInterval = 4; + final int noBlocks = 4; + final String value = "value-"; + final int size = noBlocks * compressBlocksize / (4 + value.length()); + + conf.setInt("io.seqfile.compress.blocksize", compressBlocksize); + MapFile.Writer.setIndexInterval(conf, indexInterval); + FileSystem fs = FileSystem.getLocal(conf); + Path dir = new Path(TEST_DIR, indexLessMapFile); + MapFile.Writer writer = null; + MapFile.Reader reader = null; + try { + writer = + new MapFile.Writer(conf, dir, + MapFile.Writer.keyClass(IntWritable.class), + MapFile.Writer.valueClass(Text.class), + MapFile.Writer.compression(CompressionType.BLOCK)); + for (int i = 0; i < size; i++) { + writer.append(new IntWritable(i), new Text(value + i)); + } + writer.close(); + Path index = new Path(dir, MapFile.INDEX_FILE_NAME); + fs.rename(index, index.suffix(".orig")); + + assertEquals("No of valid MapFile entries wrong", size, + MapFile.fix(fs, dir, IntWritable.class, Text.class, + false, conf)); + reader = new MapFile.Reader(dir, conf); + IntWritable key; + Text val = new Text(); + int notFound = 0; + for (int i = 0; i < size; i++) { + key = new IntWritable(i); + if (null == reader.get(key, val)) { + notFound++; + } + } + assertEquals("With MapFile.fix-ed index, could not get entries # ", + 0, notFound); + } finally { + IOUtils.cleanupWithLogger(null, writer, reader); + if (fs.exists(dir)) { + fs.delete(dir, true); + } + } + } + /** * test all available constructor for {@code MapFile.Writer} */ @@ -619,7 +676,7 @@ public void testMainMethodMapFile() { } catch (Exception ex) { fail("testMainMethodMapFile error !!!"); } finally { - IOUtils.cleanup(null, writer); + IOUtils.cleanupWithLogger(null, writer); } }