MapFile.fix creates a wrong index file in case of block-compressed data file. Contributed by Grigori Rybkine

This commit is contained in:
Chris Douglas 2018-01-26 09:06:48 -08:00
parent 8b5b045bd2
commit 56872cff92
2 changed files with 89 additions and 7 deletions

View File

@ -811,15 +811,40 @@ public static long fix(FileSystem fs, Path dir,
(LongWritable.class)); (LongWritable.class));
} }
try { try {
long pos = 0L; /** What's the position (in bytes) we wrote when we got the last index */
long lastIndexPos = -1;
/**
* What was size when we last wrote an index. Set to MIN_VALUE to ensure
* that we have an index at position zero - midKey will throw an exception
* if this is not the case
*/
long lastIndexKeyCount = Long.MIN_VALUE;
long pos = dataReader.getPosition();
LongWritable position = new LongWritable(); LongWritable position = new LongWritable();
long nextBlock = pos;
boolean blockCompressed = dataReader.isBlockCompressed();
while(dataReader.next(key, value)) { while(dataReader.next(key, value)) {
cnt++; if (blockCompressed) {
if (cnt % indexInterval == 0) { long curPos = dataReader.getPosition();
position.set(pos); if (curPos > nextBlock) {
if (!dryrun) indexWriter.append(key, position); pos = nextBlock; // current block position
nextBlock = curPos;
} }
pos = dataReader.getPosition(); }
// Follow the same logic as in
// {@link MapFile.Writer#append(WritableComparable, Writable)}
if (cnt >= lastIndexKeyCount + indexInterval && pos > lastIndexPos) {
position.set(pos);
if (!dryrun) {
indexWriter.append(key, position);
}
lastIndexPos = pos;
lastIndexKeyCount = cnt;
}
if (!blockCompressed) {
pos = dataReader.getPosition(); // next record position
}
cnt++;
} }
} catch(Throwable t) { } catch(Throwable t) {
// truncated data file. swallow it. // truncated data file. swallow it.

View File

@ -485,6 +485,63 @@ public void testFix() {
IOUtils.cleanup(null, writer); IOUtils.cleanup(null, writer);
} }
} }
/**
* test {@link MapFile#fix(FileSystem, Path, Class<? extends Writable>,
* Class<? extends Writable>, boolean, Configuration)}
* method in case of BLOCK compression
*/
@Test
public void testFixBlockCompress() throws Exception {
final String indexLessMapFile = "testFixBlockCompress.mapfile";
final int compressBlocksize = 100;
final int indexInterval = 4;
final int noBlocks = 4;
final String value = "value-";
final int size = noBlocks * compressBlocksize / (4 + value.length());
conf.setInt("io.seqfile.compress.blocksize", compressBlocksize);
MapFile.Writer.setIndexInterval(conf, indexInterval);
FileSystem fs = FileSystem.getLocal(conf);
Path dir = new Path(TEST_DIR, indexLessMapFile);
MapFile.Writer writer = null;
MapFile.Reader reader = null;
try {
writer =
new MapFile.Writer(conf, dir,
MapFile.Writer.keyClass(IntWritable.class),
MapFile.Writer.valueClass(Text.class),
MapFile.Writer.compression(CompressionType.BLOCK));
for (int i = 0; i < size; i++) {
writer.append(new IntWritable(i), new Text(value + i));
}
writer.close();
Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
fs.rename(index, index.suffix(".orig"));
assertEquals("No of valid MapFile entries wrong", size,
MapFile.fix(fs, dir, IntWritable.class, Text.class,
false, conf));
reader = new MapFile.Reader(dir, conf);
IntWritable key;
Text val = new Text();
int notFound = 0;
for (int i = 0; i < size; i++) {
key = new IntWritable(i);
if (null == reader.get(key, val)) {
notFound++;
}
}
assertEquals("With MapFile.fix-ed index, could not get entries # ",
0, notFound);
} finally {
IOUtils.cleanupWithLogger(null, writer, reader);
if (fs.exists(dir)) {
fs.delete(dir, true);
}
}
}
/** /**
* test all available constructor for {@code MapFile.Writer} * test all available constructor for {@code MapFile.Writer}
*/ */
@ -619,7 +676,7 @@ public void testMainMethodMapFile() {
} catch (Exception ex) { } catch (Exception ex) {
fail("testMainMethodMapFile error !!!"); fail("testMainMethodMapFile error !!!");
} finally { } finally {
IOUtils.cleanup(null, writer); IOUtils.cleanupWithLogger(null, writer);
} }
} }