diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 2ac1389c04..c510ff78a0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -24,6 +24,7 @@ import java.rmi.server.UID; import java.security.MessageDigest; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.*; import org.apache.hadoop.util.Options; import org.apache.hadoop.fs.*; @@ -146,7 +147,7 @@ * * *
  • - * A sync-marker every few 100 bytes or so. + * A sync-marker every few 100 kilobytes or so. *
  • * * @@ -165,7 +166,7 @@ * * *
  • - * A sync-marker every few 100 bytes or so. + * A sync-marker every few 100 kilobytes or so. *
  • * * @@ -217,8 +218,11 @@ private SequenceFile() {} // no public ctor private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash - /** The number of bytes between sync points.*/ - public static final int SYNC_INTERVAL = 100*SYNC_SIZE; + /** + * The number of bytes between sync points. 100 KB, default. + * Computed as 5 KB * 20 = 100 KB + */ + public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4) /** * The compression type used to compress key/value pairs in the @@ -856,6 +860,9 @@ public static class Writer implements java.io.Closeable, Syncable { // starts and ends by scanning for this value. long lastSyncPos; // position of last sync byte[] sync; // 16 random bytes + @VisibleForTesting + int syncInterval; + { try { MessageDigest digester = MessageDigest.getInstance("MD5"); @@ -987,7 +994,16 @@ public static Option file(Path value) { private static Option filesystem(FileSystem fs) { return new SequenceFile.Writer.FileSystemOption(fs); } - + + private static class SyncIntervalOption extends Options.IntegerOption + implements Option { + SyncIntervalOption(int val) { + // If a negative sync interval is provided, + // fall back to the default sync interval. + super(val < 0 ? SYNC_INTERVAL : val); + } + } + public static Option bufferSize(int value) { return new BufferSizeOption(value); } @@ -1032,11 +1048,15 @@ public static Option compression(CompressionType value, CompressionCodec codec) { return new CompressionOption(value, codec); } - + + public static Option syncInterval(int value) { + return new SyncIntervalOption(value); + } + /** * Construct a uncompressed writer from a set of options. * @param conf the configuration to use - * @param options the options used when creating the writer + * @param opts the options used when creating the writer * @throws IOException if it fails */ Writer(Configuration conf, @@ -1062,6 +1082,8 @@ public static Option compression(CompressionType value, Options.getOption(MetadataOption.class, opts); CompressionOption compressionTypeOption = Options.getOption(CompressionOption.class, opts); + SyncIntervalOption syncIntervalOption = + Options.getOption(SyncIntervalOption.class, opts); // check consistency of options if ((fileOption == null) == (streamOption == null)) { throw new IllegalArgumentException("file or stream must be specified"); @@ -1163,7 +1185,12 @@ public static Option compression(CompressionType value, "GzipCodec without native-hadoop " + "code!"); } - init(conf, out, ownStream, keyClass, valueClass, codec, metadata); + this.syncInterval = (syncIntervalOption == null) ? + SYNC_INTERVAL : + syncIntervalOption.getValue(); + init( + conf, out, ownStream, keyClass, valueClass, + codec, metadata, syncInterval); } /** Create the named file. @@ -1176,7 +1203,7 @@ public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name), true, keyClass, valClass, null, - new Metadata()); + new Metadata(), SYNC_INTERVAL); } /** Create the named file with write-progress reporter. @@ -1190,7 +1217,7 @@ public Writer(FileSystem fs, Configuration conf, Path name, Progressable progress, Metadata metadata) throws IOException { this.compress = CompressionType.NONE; init(conf, fs.create(name, progress), true, keyClass, valClass, - null, metadata); + null, metadata, SYNC_INTERVAL); } /** Create the named file with write-progress reporter. @@ -1206,7 +1233,7 @@ public Writer(FileSystem fs, Configuration conf, Path name, this.compress = CompressionType.NONE; init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), - true, keyClass, valClass, null, metadata); + true, keyClass, valClass, null, metadata, SYNC_INTERVAL); } boolean isCompressed() { return compress != CompressionType.NONE; } @@ -1234,18 +1261,21 @@ private void writeFileHeader() /** Initialize. */ @SuppressWarnings("unchecked") - void init(Configuration conf, FSDataOutputStream out, boolean ownStream, - Class keyClass, Class valClass, - CompressionCodec codec, Metadata metadata) + void init(Configuration config, FSDataOutputStream outStream, + boolean ownStream, Class key, Class val, + CompressionCodec compCodec, Metadata meta, + int syncIntervalVal) throws IOException { - this.conf = conf; - this.out = out; + this.conf = config; + this.out = outStream; this.ownOutputStream = ownStream; - this.keyClass = keyClass; - this.valClass = valClass; - this.codec = codec; - this.metadata = metadata; - SerializationFactory serializationFactory = new SerializationFactory(conf); + this.keyClass = key; + this.valClass = val; + this.codec = compCodec; + this.metadata = meta; + this.syncInterval = syncIntervalVal; + SerializationFactory serializationFactory = + new SerializationFactory(config); this.keySerializer = serializationFactory.getSerializer(keyClass); if (this.keySerializer == null) { throw new IOException( @@ -1366,7 +1396,7 @@ public synchronized void close() throws IOException { synchronized void checkAndWriteSync() throws IOException { if (sync != null && - out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync + out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync sync(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java index bceb8aff2e..363177b464 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestSequenceFileSync.java @@ -27,13 +27,15 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; +/** Tests sync based seek reads/write intervals inside SequenceFiles. */ public class TestSequenceFileSync { private static final int NUMRECORDS = 2000; private static final int RECORDSIZE = 80; - private static final Random rand = new Random(); + private static final Random RAND = new Random(); private final static String REC_FMT = "%d RECORDID %d : "; @@ -46,37 +48,110 @@ private static void forOffset(SequenceFile.Reader reader, reader.next(key, val); assertEquals(key.get(), expectedRecord); final String test = String.format(REC_FMT, expectedRecord, expectedRecord); - assertEquals("Invalid value " + val, 0, val.find(test, 0)); + assertEquals( + "Invalid value in iter " + iter + ": " + val, + 0, + val.find(test, 0)); + } + + @Test + public void testDefaultSyncInterval() throws IOException { + // Uses the default sync interval of 100 KB + final Configuration conf = new Configuration(); + final FileSystem fs = FileSystem.getLocal(conf); + final Path path = new Path(GenericTestUtils.getTempPath( + "sequencefile.sync.test")); + final IntWritable input = new IntWritable(); + final Text val = new Text(); + SequenceFile.Writer writer = new SequenceFile.Writer( + conf, + SequenceFile.Writer.file(path), + SequenceFile.Writer.compression(CompressionType.NONE), + SequenceFile.Writer.keyClass(IntWritable.class), + SequenceFile.Writer.valueClass(Text.class) + ); + try { + writeSequenceFile(writer, NUMRECORDS*4); + for (int i = 0; i < 5; i++) { + final SequenceFile.Reader reader; + + //try different SequenceFile.Reader constructors + if (i % 2 == 0) { + final int buffersize = conf.getInt("io.file.buffer.size", 4096); + reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(path), + SequenceFile.Reader.bufferSize(buffersize)); + } else { + final FSDataInputStream in = fs.open(path); + final long length = fs.getFileStatus(path).getLen(); + reader = new SequenceFile.Reader(conf, + SequenceFile.Reader.stream(in), + SequenceFile.Reader.start(0L), + SequenceFile.Reader.length(length)); + } + + try { + forOffset(reader, input, val, i, 0, 0); + forOffset(reader, input, val, i, 65, 0); + // There would be over 1000 records within + // this sync interval + forOffset(reader, input, val, i, 2000, 1101); + forOffset(reader, input, val, i, 0, 0); + } finally { + reader.close(); + } + } + } finally { + fs.delete(path, false); + } } @Test public void testLowSyncpoint() throws IOException { + // Uses a smaller sync interval of 2000 bytes final Configuration conf = new Configuration(); final FileSystem fs = FileSystem.getLocal(conf); final Path path = new Path(GenericTestUtils.getTempPath( "sequencefile.sync.test")); final IntWritable input = new IntWritable(); final Text val = new Text(); - SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, - IntWritable.class, Text.class); + SequenceFile.Writer writer = new SequenceFile.Writer( + conf, + SequenceFile.Writer.file(path), + SequenceFile.Writer.compression(CompressionType.NONE), + SequenceFile.Writer.keyClass(IntWritable.class), + SequenceFile.Writer.valueClass(Text.class), + SequenceFile.Writer.syncInterval(20*100) + ); + // Ensure the custom sync interval value is set + assertEquals(writer.syncInterval, 20*100); try { writeSequenceFile(writer, NUMRECORDS); - for (int i = 0; i < 5 ; i++) { - final SequenceFile.Reader reader; + for (int i = 0; i < 5; i++) { + final SequenceFile.Reader reader; - //try different SequenceFile.Reader constructors - if (i % 2 == 0) { - reader = new SequenceFile.Reader(fs, path, conf); - } else { - final FSDataInputStream in = fs.open(path); - final long length = fs.getFileStatus(path).getLen(); - final int buffersize = conf.getInt("io.file.buffer.size", 4096); - reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf); - } + //try different SequenceFile.Reader constructors + if (i % 2 == 0) { + final int bufferSize = conf.getInt("io.file.buffer.size", 4096); + reader = new SequenceFile.Reader( + conf, + SequenceFile.Reader.file(path), + SequenceFile.Reader.bufferSize(bufferSize)); + } else { + final FSDataInputStream in = fs.open(path); + final long length = fs.getFileStatus(path).getLen(); + reader = new SequenceFile.Reader( + conf, + SequenceFile.Reader.stream(in), + SequenceFile.Reader.start(0L), + SequenceFile.Reader.length(length)); + } - try { + try { forOffset(reader, input, val, i, 0, 0); forOffset(reader, input, val, i, 65, 0); + // There would be only a few records within + // this sync interval forOffset(reader, input, val, i, 2000, 21); forOffset(reader, input, val, i, 0, 0); } finally { @@ -88,7 +163,7 @@ public void testLowSyncpoint() throws IOException { } } - public static void writeSequenceFile(SequenceFile.Writer writer, + private static void writeSequenceFile(SequenceFile.Writer writer, int numRecords) throws IOException { final IntWritable key = new IntWritable(); final Text val = new Text(); @@ -100,13 +175,13 @@ public static void writeSequenceFile(SequenceFile.Writer writer, writer.close(); } - static void randomText(Text val, int id, int recordSize) { + private static void randomText(Text val, int id, int recordSize) { val.clear(); final StringBuilder ret = new StringBuilder(recordSize); ret.append(String.format(REC_FMT, id, id)); recordSize -= ret.length(); for (int i = 0; i < recordSize; ++i) { - ret.append(rand.nextInt(9)); + ret.append(RAND.nextInt(9)); } val.set(ret.toString()); }