HADOOP-1381. The distance between sync blocks in SequenceFiles should be configurable rather than hard coded to 2000 bytes. Contributed by Harsh J.

This commit is contained in:
Harsh J 2016-10-26 20:04:33 +05:30
parent ee3d437a33
commit 07825f2b49
2 changed files with 146 additions and 41 deletions

View File

@ -24,6 +24,7 @@
import java.rmi.server.UID;
import java.security.MessageDigest;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.*;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
@ -146,7 +147,7 @@
* </ul>
* </li>
* <li>
* A sync-marker every few <code>100</code> bytes or so.
* A sync-marker every few <code>100</code> kilobytes or so.
* </li>
* </ul>
*
@ -165,7 +166,7 @@
* </ul>
* </li>
* <li>
* A sync-marker every few <code>100</code> bytes or so.
* A sync-marker every few <code>100</code> kilobytes or so.
* </li>
* </ul>
*
@ -217,8 +218,11 @@ private SequenceFile() {} // no public ctor
private static final int SYNC_HASH_SIZE = 16; // number of bytes in hash
private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
/** The number of bytes between sync points.*/
public static final int SYNC_INTERVAL = 100*SYNC_SIZE;
/**
* The number of bytes between sync points. 100 KB, default.
* Computed as 5 KB * 20 = 100 KB
*/
public static final int SYNC_INTERVAL = 5 * 1024 * SYNC_SIZE; // 5KB*(16+4)
/**
* The compression type used to compress key/value pairs in the
@ -856,6 +860,9 @@ public static class Writer implements java.io.Closeable, Syncable {
// starts and ends by scanning for this value.
long lastSyncPos; // position of last sync
byte[] sync; // 16 random bytes
@VisibleForTesting
int syncInterval;
{
try {
MessageDigest digester = MessageDigest.getInstance("MD5");
@ -988,6 +995,15 @@ private static Option filesystem(FileSystem fs) {
return new SequenceFile.Writer.FileSystemOption(fs);
}
private static class SyncIntervalOption extends Options.IntegerOption
implements Option {
SyncIntervalOption(int val) {
// If a negative sync interval is provided,
// fall back to the default sync interval.
super(val < 0 ? SYNC_INTERVAL : val);
}
}
public static Option bufferSize(int value) {
return new BufferSizeOption(value);
}
@ -1033,10 +1049,14 @@ public static Option compression(CompressionType value,
return new CompressionOption(value, codec);
}
public static Option syncInterval(int value) {
return new SyncIntervalOption(value);
}
/**
* Construct a uncompressed writer from a set of options.
* @param conf the configuration to use
* @param options the options used when creating the writer
* @param opts the options used when creating the writer
* @throws IOException if it fails
*/
Writer(Configuration conf,
@ -1062,6 +1082,8 @@ public static Option compression(CompressionType value,
Options.getOption(MetadataOption.class, opts);
CompressionOption compressionTypeOption =
Options.getOption(CompressionOption.class, opts);
SyncIntervalOption syncIntervalOption =
Options.getOption(SyncIntervalOption.class, opts);
// check consistency of options
if ((fileOption == null) == (streamOption == null)) {
throw new IllegalArgumentException("file or stream must be specified");
@ -1163,7 +1185,12 @@ public static Option compression(CompressionType value,
"GzipCodec without native-hadoop " +
"code!");
}
init(conf, out, ownStream, keyClass, valueClass, codec, metadata);
this.syncInterval = (syncIntervalOption == null) ?
SYNC_INTERVAL :
syncIntervalOption.getValue();
init(
conf, out, ownStream, keyClass, valueClass,
codec, metadata, syncInterval);
}
/** Create the named file.
@ -1176,7 +1203,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
Class keyClass, Class valClass) throws IOException {
this.compress = CompressionType.NONE;
init(conf, fs.create(name), true, keyClass, valClass, null,
new Metadata());
new Metadata(), SYNC_INTERVAL);
}
/** Create the named file with write-progress reporter.
@ -1190,7 +1217,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
Progressable progress, Metadata metadata) throws IOException {
this.compress = CompressionType.NONE;
init(conf, fs.create(name, progress), true, keyClass, valClass,
null, metadata);
null, metadata, SYNC_INTERVAL);
}
/** Create the named file with write-progress reporter.
@ -1206,7 +1233,7 @@ public Writer(FileSystem fs, Configuration conf, Path name,
this.compress = CompressionType.NONE;
init(conf,
fs.create(name, true, bufferSize, replication, blockSize, progress),
true, keyClass, valClass, null, metadata);
true, keyClass, valClass, null, metadata, SYNC_INTERVAL);
}
boolean isCompressed() { return compress != CompressionType.NONE; }
@ -1234,18 +1261,21 @@ private void writeFileHeader()
/** Initialize. */
@SuppressWarnings("unchecked")
void init(Configuration conf, FSDataOutputStream out, boolean ownStream,
Class keyClass, Class valClass,
CompressionCodec codec, Metadata metadata)
void init(Configuration config, FSDataOutputStream outStream,
boolean ownStream, Class key, Class val,
CompressionCodec compCodec, Metadata meta,
int syncIntervalVal)
throws IOException {
this.conf = conf;
this.out = out;
this.conf = config;
this.out = outStream;
this.ownOutputStream = ownStream;
this.keyClass = keyClass;
this.valClass = valClass;
this.codec = codec;
this.metadata = metadata;
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyClass = key;
this.valClass = val;
this.codec = compCodec;
this.metadata = meta;
this.syncInterval = syncIntervalVal;
SerializationFactory serializationFactory =
new SerializationFactory(config);
this.keySerializer = serializationFactory.getSerializer(keyClass);
if (this.keySerializer == null) {
throw new IOException(
@ -1366,7 +1396,7 @@ public synchronized void close() throws IOException {
synchronized void checkAndWriteSync() throws IOException {
if (sync != null &&
out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
out.getPos() >= lastSyncPos+this.syncInterval) { // time to emit sync
sync();
}
}

View File

@ -27,13 +27,15 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
/** Tests sync based seek reads/write intervals inside SequenceFiles. */
public class TestSequenceFileSync {
private static final int NUMRECORDS = 2000;
private static final int RECORDSIZE = 80;
private static final Random rand = new Random();
private static final Random RAND = new Random();
private final static String REC_FMT = "%d RECORDID %d : ";
@ -46,37 +48,110 @@ private static void forOffset(SequenceFile.Reader reader,
reader.next(key, val);
assertEquals(key.get(), expectedRecord);
final String test = String.format(REC_FMT, expectedRecord, expectedRecord);
assertEquals("Invalid value " + val, 0, val.find(test, 0));
assertEquals(
"Invalid value in iter " + iter + ": " + val,
0,
val.find(test, 0));
}
@Test
public void testDefaultSyncInterval() throws IOException {
// Uses the default sync interval of 100 KB
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.getLocal(conf);
final Path path = new Path(GenericTestUtils.getTempPath(
"sequencefile.sync.test"));
final IntWritable input = new IntWritable();
final Text val = new Text();
SequenceFile.Writer writer = new SequenceFile.Writer(
conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(CompressionType.NONE),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class)
);
try {
writeSequenceFile(writer, NUMRECORDS*4);
for (int i = 0; i < 5; i++) {
final SequenceFile.Reader reader;
//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path),
SequenceFile.Reader.bufferSize(buffersize));
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.stream(in),
SequenceFile.Reader.start(0L),
SequenceFile.Reader.length(length));
}
try {
forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0);
// There would be over 1000 records within
// this sync interval
forOffset(reader, input, val, i, 2000, 1101);
forOffset(reader, input, val, i, 0, 0);
} finally {
reader.close();
}
}
} finally {
fs.delete(path, false);
}
}
@Test
public void testLowSyncpoint() throws IOException {
// Uses a smaller sync interval of 2000 bytes
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.getLocal(conf);
final Path path = new Path(GenericTestUtils.getTempPath(
"sequencefile.sync.test"));
final IntWritable input = new IntWritable();
final Text val = new Text();
SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
IntWritable.class, Text.class);
SequenceFile.Writer writer = new SequenceFile.Writer(
conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.compression(CompressionType.NONE),
SequenceFile.Writer.keyClass(IntWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.syncInterval(20*100)
);
// Ensure the custom sync interval value is set
assertEquals(writer.syncInterval, 20*100);
try {
writeSequenceFile(writer, NUMRECORDS);
for (int i = 0; i < 5 ; i++) {
final SequenceFile.Reader reader;
for (int i = 0; i < 5; i++) {
final SequenceFile.Reader reader;
//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
reader = new SequenceFile.Reader(fs, path, conf);
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
}
//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(
conf,
SequenceFile.Reader.file(path),
SequenceFile.Reader.bufferSize(bufferSize));
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
reader = new SequenceFile.Reader(
conf,
SequenceFile.Reader.stream(in),
SequenceFile.Reader.start(0L),
SequenceFile.Reader.length(length));
}
try {
try {
forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0);
// There would be only a few records within
// this sync interval
forOffset(reader, input, val, i, 2000, 21);
forOffset(reader, input, val, i, 0, 0);
} finally {
@ -88,7 +163,7 @@ public void testLowSyncpoint() throws IOException {
}
}
public static void writeSequenceFile(SequenceFile.Writer writer,
private static void writeSequenceFile(SequenceFile.Writer writer,
int numRecords) throws IOException {
final IntWritable key = new IntWritable();
final Text val = new Text();
@ -100,13 +175,13 @@ public static void writeSequenceFile(SequenceFile.Writer writer,
writer.close();
}
static void randomText(Text val, int id, int recordSize) {
private static void randomText(Text val, int id, int recordSize) {
val.clear();
final StringBuilder ret = new StringBuilder(recordSize);
ret.append(String.format(REC_FMT, id, id));
recordSize -= ret.length();
for (int i = 0; i < recordSize; ++i) {
ret.append(rand.nextInt(9));
ret.append(RAND.nextInt(9));
}
val.set(ret.toString());
}