From c9cc61e8538925081b6ed92596e599d23c8a5542 Mon Sep 17 00:00:00 2001 From: Christopher Douglas Date: Fri, 11 Sep 2009 07:38:01 +0000 Subject: [PATCH] HADOOP-6196. Fix a bug in SequenceFile.Reader where syncing within the header would cause the reader to read the sync marker as a record. Contributed by Jay Booth git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@813698 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../org/apache/hadoop/io/SequenceFile.java | 10 ++ .../hadoop/io/TestSequenceFileSync.java | 107 ++++++++++++++++++ 3 files changed, 121 insertions(+) create mode 100644 src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java diff --git a/CHANGES.txt b/CHANGES.txt index 8eabd3e1b0..22541c2da6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1008,6 +1008,10 @@ Trunk (unreleased changes) HADOOP-6181. Fix .eclipse.templates/.classpath for avro and jets3t jar files. (Carlos Valiente via szetszwo) + HADOOP-6196. Fix a bug in SequenceFile.Reader where syncing within the + header would cause the reader to read the sync marker as a record. (Jay + Booth via cdouglas) + Release 0.20.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index ae494f948e..db77fa0ecf 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -1397,6 +1397,7 @@ public static class Reader implements java.io.Closeable { private byte[] syncCheck = new byte[SYNC_HASH_SIZE]; private boolean syncSeen; + private long headerEnd; private long end; private int keyLength; private int recordLength; @@ -1546,6 +1547,7 @@ private void init(boolean tempReader) throws IOException { if (version > 1) { // if version > 1 in.readFully(sync); // read sync bytes + headerEnd = in.getPos(); // record end of header } // Initialize... *not* if this we are constructing a temporary Reader @@ -2210,6 +2212,14 @@ public synchronized void sync(long position) throws IOException { return; } + if (position < headerEnd) { + // seek directly to first record + in.seek(headerEnd); + // note the sync marker "seen" in the header + syncSeen = true; + return; + } + try { seek(position+4); // skip escape in.readFully(syncCheck); diff --git a/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java new file mode 100644 index 0000000000..933ee29cfe --- /dev/null +++ b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static org.junit.Assert.*; + +public class TestSequenceFileSync { + private static final int NUMRECORDS = 2000; + private static final int RECORDSIZE = 80; + private static final Random rand = new Random(); + + private final static String REC_FMT = "%d RECORDID %d : "; + + + private static void forOffset(SequenceFile.Reader reader, + IntWritable key, Text val, int iter, long off, int expectedRecord) + throws IOException { + val.clear(); + reader.sync(off); + reader.next(key, val); + assertEquals(key.get(), expectedRecord); + final String test = String.format(REC_FMT, expectedRecord, expectedRecord); + assertEquals("Invalid value " + val, 0, val.find(test, 0)); + } + + @Test + public void testLowSyncpoint() throws IOException { + final Configuration conf = new Configuration(); + final FileSystem fs = FileSystem.getLocal(conf); + final Path path = new Path(System.getProperty("test.build.data", "/tmp"), + "sequencefile.sync.test"); + final IntWritable input = new IntWritable(); + final Text val = new Text(); + SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, + IntWritable.class, Text.class); + try { + writeSequenceFile(writer, NUMRECORDS); + for (int i = 0; i < 5 ; i++) { + final SequenceFile.Reader reader = + new SequenceFile.Reader(fs, path, conf); + try { + forOffset(reader, input, val, i, 0, 0); + forOffset(reader, input, val, i, 65, 0); + forOffset(reader, input, val, i, 2000, 21); + forOffset(reader, input, val, i, 0, 0); + } finally { + reader.close(); + } + } + } finally { + fs.delete(path, false); + } + } + + public static void writeSequenceFile(SequenceFile.Writer writer, + int numRecords) throws IOException { + final IntWritable key = new IntWritable(); + final Text val = new Text(); + for (int numWritten = 0; numWritten < numRecords; ++numWritten) { + key.set(numWritten); + randomText(val, numWritten, RECORDSIZE); + writer.append(key, val); + } + writer.close(); + } + + static void randomText(Text val, int id, int recordSize) { + val.clear(); + final StringBuilder ret = new StringBuilder(recordSize); + ret.append(String.format(REC_FMT, id, id)); + recordSize -= ret.length(); + for (int i = 0; i < recordSize; ++i) { + ret.append(rand.nextInt(9)); + } + val.set(ret.toString()); + } +}