From 6333b3e485dc76a7505bf74e041e274e0a8e6faf Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Thu, 30 Sep 2010 02:59:32 +0000 Subject: [PATCH] HADOOP-6856. Simplify constructors for SequenceFile, and MapFile. (omalley) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1002937 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + src/java/org/apache/hadoop/fs/FsShell.java | 3 +- src/java/org/apache/hadoop/io/ArrayFile.java | 14 +- .../org/apache/hadoop/io/BloomMapFile.java | 92 +- src/java/org/apache/hadoop/io/MapFile.java | 319 ++++-- .../org/apache/hadoop/io/SequenceFile.java | 998 +++++++++++------- src/java/org/apache/hadoop/io/SetFile.java | 7 +- .../security/RefreshUserMappingsProtocol.java | 1 - 8 files changed, 890 insertions(+), 546 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0590fa7e4b..06b808175e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -137,6 +137,8 @@ Trunk (unreleased changes) HADOOP-6965. Introduces checks for whether the original tgt is valid in the reloginFromKeytab method. + HADOOP-6856. Simplify constructors for SequenceFile, and MapFile. (omalley) + OPTIMIZATIONS HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..). diff --git a/src/java/org/apache/hadoop/fs/FsShell.java b/src/java/org/apache/hadoop/fs/FsShell.java index cbf8823476..7eb99ad67c 100644 --- a/src/java/org/apache/hadoop/fs/FsShell.java +++ b/src/java/org/apache/hadoop/fs/FsShell.java @@ -372,7 +372,8 @@ private class TextRecordInputStream extends InputStream { public TextRecordInputStream(FileStatus f) throws IOException { final Path fpath = f.getPath(); final Configuration lconf = getConf(); - r = new SequenceFile.Reader(fpath.getFileSystem(lconf), fpath, lconf); + r = new SequenceFile.Reader(lconf, + SequenceFile.Reader.file(fpath)); key = ReflectionUtils.newInstance( r.getKeyClass().asSubclass(WritableComparable.class), lconf); val = ReflectionUtils.newInstance( diff --git a/src/java/org/apache/hadoop/io/ArrayFile.java b/src/java/org/apache/hadoop/io/ArrayFile.java index ca72f69d56..a0ab2422ba 100644 --- a/src/java/org/apache/hadoop/io/ArrayFile.java +++ b/src/java/org/apache/hadoop/io/ArrayFile.java @@ -42,7 +42,8 @@ public static class Writer extends MapFile.Writer { public Writer(Configuration conf, FileSystem fs, String file, Class valClass) throws IOException { - super(conf, fs, file, LongWritable.class, valClass); + super(conf, new Path(file), keyClass(LongWritable.class), + valueClass(valClass)); } /** Create the named file for values of the named class. */ @@ -50,7 +51,11 @@ public Writer(Configuration conf, FileSystem fs, String file, Class valClass, CompressionType compress, Progressable progress) throws IOException { - super(conf, fs, file, LongWritable.class, valClass, compress, progress); + super(conf, new Path(file), + keyClass(LongWritable.class), + valueClass(valClass), + compressionType(compress), + progressable(progress)); } /** Append a value to the file. */ @@ -65,8 +70,9 @@ public static class Reader extends MapFile.Reader { private LongWritable key = new LongWritable(); /** Construct an array reader for the named file.*/ - public Reader(FileSystem fs, String file, Configuration conf) throws IOException { - super(fs, file, conf); + public Reader(FileSystem fs, String file, + Configuration conf) throws IOException { + super(new Path(file), conf); } /** Positions the reader before its nth value. */ diff --git a/src/java/org/apache/hadoop/io/BloomMapFile.java b/src/java/org/apache/hadoop/io/BloomMapFile.java index 00084ea7ef..d1431e0477 100644 --- a/src/java/org/apache/hadoop/io/BloomMapFile.java +++ b/src/java/org/apache/hadoop/io/BloomMapFile.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.Options; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.bloom.DynamicBloomFilter; import org.apache.hadoop.util.bloom.Filter; @@ -82,78 +83,80 @@ public static class Writer extends MapFile.Writer { private FileSystem fs; private Path dir; + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { - super(conf, fs, dirName, keyClass, valClass, compress, codec, progress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), + compressionType(compress), compressionCodec(codec), + progressable(progress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress, Progressable progress) throws IOException { - super(conf, fs, dirName, keyClass, valClass, compress, progress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), + compressionType(compress), progressable(progress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress) throws IOException { - super(conf, fs, dirName, keyClass, valClass, compress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), + compressionType(compress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { - super(conf, fs, dirName, comparator, valClass, compress, codec, progress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress), + compressionCodec(codec), progressable(progress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress, Progressable progress) throws IOException { - super(conf, fs, dirName, comparator, valClass, compress, progress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress), + progressable(progress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, CompressionType compress) throws IOException { - super(conf, fs, dirName, comparator, valClass, compress); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass) throws IOException { - super(conf, fs, dirName, comparator, valClass); - this.fs = fs; - this.dir = new Path(dirName); - initBloomFilter(conf); + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass)); } + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, - Class keyClass, - Class valClass) throws IOException { - super(conf, fs, dirName, keyClass, valClass); - this.fs = fs; - this.dir = new Path(dirName); + Class keyClass, + Class valClass) throws IOException { + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); + } + + public Writer(Configuration conf, Path dir, + SequenceFile.Writer.Option... options) throws IOException { + super(conf, dir, options); + this.fs = dir.getFileSystem(conf); + this.dir = dir; initBloomFilter(conf); } @@ -197,27 +200,34 @@ public static class Reader extends MapFile.Reader { private DataOutputBuffer buf = new DataOutputBuffer(); private Key bloomKey = new Key(); + public Reader(Path dir, Configuration conf, + SequenceFile.Reader.Option... options) throws IOException { + super(dir, conf, options); + initBloomFilter(dir, conf); + } + + @Deprecated public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException { - super(fs, dirName, conf); - initBloomFilter(fs, dirName, conf); + this(new Path(dirName), conf); } + @Deprecated public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf, boolean open) throws IOException { - super(fs, dirName, comparator, conf, open); - initBloomFilter(fs, dirName, conf); + this(new Path(dirName), conf, comparator(comparator)); } + @Deprecated public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf) throws IOException { - super(fs, dirName, comparator, conf); - initBloomFilter(fs, dirName, conf); + this(new Path(dirName), conf, comparator(comparator)); } - private void initBloomFilter(FileSystem fs, String dirName, - Configuration conf) { + private void initBloomFilter(Path dirName, + Configuration conf) { try { + FileSystem fs = dirName.getFileSystem(conf); DataInputStream in = fs.open(new Path(dirName, BLOOM_FILE_NAME)); bloomFilter = new DynamicBloomFilter(); bloomFilter.readFields(in); diff --git a/src/java/org/apache/hadoop/io/MapFile.java b/src/java/org/apache/hadoop/io/MapFile.java index e1d03193d5..105b62763b 100644 --- a/src/java/org/apache/hadoop/io/MapFile.java +++ b/src/java/org/apache/hadoop/io/MapFile.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.Options; import org.apache.hadoop.fs.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -31,6 +32,8 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Reader; +import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; @@ -91,94 +94,194 @@ public static class Writer implements java.io.Closeable { private long lastIndexKeyCount = Long.MIN_VALUE; - /** Create the named map for keys of the named class. */ + /** Create the named map for keys of the named class. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, - Class keyClass, Class valClass) - throws IOException { - this(conf, fs, dirName, - WritableComparator.get(keyClass), valClass, - SequenceFile.getCompressionType(conf)); + Class keyClass, + Class valClass) throws IOException { + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass)); } - /** Create the named map for keys of the named class. */ + /** Create the named map for keys of the named class. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, - CompressionType compress, Progressable progress) - throws IOException { - this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, - compress, progress); + CompressionType compress, + Progressable progress) throws IOException { + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), + compressionType(compress), progressable(progress)); } - /** Create the named map for keys of the named class. */ + /** Create the named map for keys of the named class. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, CompressionType compress, CompressionCodec codec, - Progressable progress) - throws IOException { - this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, - compress, codec, progress); + Progressable progress) throws IOException { + this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), + compressionType(compress), compressionCodec(codec), + progressable(progress)); } - /** Create the named map for keys of the named class. */ + /** Create the named map for keys of the named class. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class keyClass, Class valClass, - CompressionType compress) - throws IOException { - this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress); + CompressionType compress) throws IOException { + this(conf, new Path(dirName), keyClass(keyClass), + valueClass(valClass), compressionType(compress)); } - /** Create the named map using the named key comparator. */ + /** Create the named map using the named key comparator. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, - WritableComparator comparator, Class valClass) - throws IOException { - this(conf, fs, dirName, comparator, valClass, - SequenceFile.getCompressionType(conf)); + WritableComparator comparator, Class valClass + ) throws IOException { + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass)); } - /** Create the named map using the named key comparator. */ + + /** Create the named map using the named key comparator. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, - SequenceFile.CompressionType compress) - throws IOException { - this(conf, fs, dirName, comparator, valClass, compress, null); + SequenceFile.CompressionType compress) throws IOException { + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress)); } - /** Create the named map using the named key comparator. */ + + /** Create the named map using the named key comparator. + * @deprecated Use Writer(Configuration, Path, Option...)} instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress, - Progressable progress) - throws IOException { - this(conf, fs, dirName, comparator, valClass, - compress, new DefaultCodec(), progress); + Progressable progress) throws IOException { + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress), + progressable(progress)); } - /** Create the named map using the named key comparator. */ + + /** Create the named map using the named key comparator. + * @deprecated Use Writer(Configuration, Path, Option...) instead. + */ + @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress, CompressionCodec codec, - Progressable progress) - throws IOException { - - this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); - - this.comparator = comparator; - this.lastKey = comparator.newKey(); - - Path dir = new Path(dirName); - if (!fs.mkdirs(dir)) { - throw new IOException("Mkdirs failed to create directory " + dir.toString()); - } - Path dataFile = new Path(dir, DATA_FILE_NAME); - Path indexFile = new Path(dir, INDEX_FILE_NAME); - - Class keyClass = comparator.getKeyClass(); - this.data = - SequenceFile.createWriter - (fs, conf, dataFile, keyClass, valClass, compress, codec, progress); - this.index = - SequenceFile.createWriter - (fs, conf, indexFile, keyClass, LongWritable.class, - CompressionType.BLOCK, progress); + Progressable progress) throws IOException { + this(conf, new Path(dirName), comparator(comparator), + valueClass(valClass), compressionType(compress), + compressionCodec(codec), progressable(progress)); } + // our options are a superset of sequence file writer options + public static interface Option extends SequenceFile.Writer.Option { } + + private static class KeyClassOption extends Options.ClassOption + implements Option { + KeyClassOption(Class value) { + super(value); + } + } + + private static class ComparatorOption implements Option { + private final WritableComparator value; + ComparatorOption(WritableComparator value) { + this.value = value; + } + WritableComparator getValue() { + return value; + } + } + + public static Option keyClass(Class value) { + return new KeyClassOption(value); + } + + public static Option comparator(WritableComparator value) { + return new ComparatorOption(value); + } + + public static SequenceFile.Writer.Option valueClass(Class value) { + return SequenceFile.Writer.valueClass(value); + } + + public static + SequenceFile.Writer.Option compressionType(CompressionType value) { + return SequenceFile.Writer.compressionType(value); + } + + public static + SequenceFile.Writer.Option compressionCodec(CompressionCodec value) { + return SequenceFile.Writer.compressionCodec(value); + } + + public static SequenceFile.Writer.Option progressable(Progressable value) { + return SequenceFile.Writer.progressable(value); + } + + @SuppressWarnings("unchecked") + public Writer(Configuration conf, + Path dirName, + SequenceFile.Writer.Option... opts + ) throws IOException { + KeyClassOption keyClassOption = + Options.getOption(KeyClassOption.class, opts); + ComparatorOption comparatorOption = + Options.getOption(ComparatorOption.class, opts); + if ((keyClassOption == null) == (comparatorOption == null)) { + throw new IllegalArgumentException("key class or comparator option " + + "must be set"); + } + this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval); + + Class keyClass; + if (keyClassOption == null) { + this.comparator = comparatorOption.getValue(); + keyClass = comparator.getKeyClass(); + } else { + keyClass= + (Class) keyClassOption.getValue(); + this.comparator = WritableComparator.get(keyClass); + } + this.lastKey = comparator.newKey(); + FileSystem fs = dirName.getFileSystem(conf); + + if (!fs.mkdirs(dirName)) { + throw new IOException("Mkdirs failed to create directory " + dirName); + } + Path dataFile = new Path(dirName, DATA_FILE_NAME); + Path indexFile = new Path(dirName, INDEX_FILE_NAME); + + SequenceFile.Writer.Option[] dataOptions = + Options.prependOptions(opts, + SequenceFile.Writer.file(dataFile), + SequenceFile.Writer.keyClass(keyClass)); + this.data = SequenceFile.createWriter(conf, dataOptions); + + SequenceFile.Writer.Option[] indexOptions = + Options.prependOptions(opts, + SequenceFile.Writer.file(indexFile), + SequenceFile.Writer.keyClass(keyClass), + SequenceFile.Writer.valueClass(LongWritable.class), + SequenceFile.Writer.compressionType(CompressionType.BLOCK)); + this.index = SequenceFile.createWriter(conf, indexOptions); + } + /** The number of entries that are added before an index entry is added.*/ public int getIndexInterval() { return indexInterval; } @@ -269,58 +372,86 @@ public static class Reader implements java.io.Closeable { /** Returns the class of values in this file. */ public Class getValueClass() { return data.getValueClass(); } - /** Construct a map reader for the named map.*/ - public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException { - this(fs, dirName, null, conf); - INDEX_SKIP = conf.getInt("io.map.index.skip", 0); + public static interface Option extends SequenceFile.Reader.Option {} + + public static Option comparator(WritableComparator value) { + return new ComparatorOption(value); } - /** Construct a map reader for the named map using the named comparator.*/ - public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf) - throws IOException { - this(fs, dirName, comparator, conf, true); - } - - /** - * Hook to allow subclasses to defer opening streams until further - * initialization is complete. - * @see #createDataFileReader(FileSystem, Path, Configuration) - */ - protected Reader(FileSystem fs, String dirName, - WritableComparator comparator, Configuration conf, boolean open) - throws IOException { - - if (open) { - open(fs, dirName, comparator, conf); + static class ComparatorOption implements Option { + private final WritableComparator value; + ComparatorOption(WritableComparator value) { + this.value = value; + } + WritableComparator getValue() { + return value; } } + + public Reader(Path dir, Configuration conf, + SequenceFile.Reader.Option... opts) throws IOException { + ComparatorOption comparatorOption = + Options.getOption(ComparatorOption.class, opts); + WritableComparator comparator = + comparatorOption == null ? null : comparatorOption.getValue(); + INDEX_SKIP = conf.getInt("io.map.index.skip", 0); + open(dir, comparator, conf, opts); + } + + /** Construct a map reader for the named map. + * @deprecated + */ + @Deprecated + public Reader(FileSystem fs, String dirName, + Configuration conf) throws IOException { + this(new Path(dirName), conf); + } + + /** Construct a map reader for the named map using the named comparator. + * @deprecated + */ + @Deprecated + public Reader(FileSystem fs, String dirName, WritableComparator comparator, + Configuration conf) throws IOException { + this(new Path(dirName), conf, comparator(comparator)); + } - protected synchronized void open(FileSystem fs, String dirName, - WritableComparator comparator, Configuration conf) throws IOException { - Path dir = new Path(dirName); + protected synchronized void open(Path dir, + WritableComparator comparator, + Configuration conf, + SequenceFile.Reader.Option... options + ) throws IOException { Path dataFile = new Path(dir, DATA_FILE_NAME); Path indexFile = new Path(dir, INDEX_FILE_NAME); // open the data - this.data = createDataFileReader(fs, dataFile, conf); + this.data = createDataFileReader(dataFile, conf, options); this.firstPosition = data.getPosition(); if (comparator == null) - this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class)); + this.comparator = + WritableComparator.get(data.getKeyClass(). + asSubclass(WritableComparable.class)); else this.comparator = comparator; // open the index - this.index = new SequenceFile.Reader(fs, indexFile, conf); + SequenceFile.Reader.Option[] indexOptions = + Options.prependOptions(options, SequenceFile.Reader.file(indexFile)); + this.index = new SequenceFile.Reader(conf, indexOptions); } /** * Override this method to specialize the type of * {@link SequenceFile.Reader} returned. */ - protected SequenceFile.Reader createDataFileReader(FileSystem fs, - Path dataFile, Configuration conf) throws IOException { - return new SequenceFile.Reader(fs, dataFile, conf); + protected SequenceFile.Reader + createDataFileReader(Path dataFile, Configuration conf, + SequenceFile.Reader.Option... options + ) throws IOException { + SequenceFile.Reader.Option[] newOptions = + Options.prependOptions(options, SequenceFile.Reader.file(dataFile)); + return new SequenceFile.Reader(conf, newOptions); } private void readIndex() throws IOException { @@ -650,7 +781,8 @@ public static long fix(FileSystem fs, Path dir, // no fixing needed return -1; } - SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf); + SequenceFile.Reader dataReader = + new SequenceFile.Reader(conf, SequenceFile.Reader.file(data)); if (!dataReader.getKeyClass().equals(keyClass)) { throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() + ", got " + dataReader.getKeyClass().getName()); @@ -663,7 +795,14 @@ public static long fix(FileSystem fs, Path dir, Writable key = ReflectionUtils.newInstance(keyClass, conf); Writable value = ReflectionUtils.newInstance(valueClass, conf); SequenceFile.Writer indexWriter = null; - if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class); + if (!dryrun) { + indexWriter = + SequenceFile.createWriter(conf, + SequenceFile.Writer.file(index), + SequenceFile.Writer.keyClass(keyClass), + SequenceFile.Writer.valueClass + (LongWritable.class)); + } try { long pos = 0L; LongWritable position = new LongWritable(); diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index a2eab7d5c6..f66acf7532 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -23,6 +23,7 @@ import java.rmi.server.UID; import java.security.MessageDigest; import org.apache.commons.logging.*; +import org.apache.hadoop.util.Options; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -226,24 +227,50 @@ public static enum CompressionType { * @param job the job config to look in * @return the kind of compression to use */ - @Deprecated - static public CompressionType getCompressionType(Configuration job) { + static public CompressionType getDefaultCompressionType(Configuration job) { String name = job.get("io.seqfile.compression.type"); return name == null ? CompressionType.RECORD : CompressionType.valueOf(name); } /** - * Set the compression type for sequence files. + * Set the default compression type for sequence files. * @param job the configuration to modify * @param val the new compression type (none, block, record) */ - @Deprecated - static public void setCompressionType(Configuration job, - CompressionType val) { + static public void setDefaultCompressionType(Configuration job, + CompressionType val) { job.set("io.seqfile.compression.type", val.toString()); } - + + /** + * Create a new Writer with the given options. + * @param conf the configuration to use + * @param opts the options to create the file with + * @return a new Writer + * @throws IOException + */ + public static Writer createWriter(Configuration conf, Writer.Option... opts + ) throws IOException { + Writer.CompressionTypeOption compressionOption = + Options.getOption(Writer.CompressionTypeOption.class, opts); + CompressionType kind; + if (compressionOption != null) { + kind = compressionOption.getValue(); + } else { + kind = getDefaultCompressionType(conf); + } + switch (kind) { + default: + case NONE: + return new Writer(conf, kind, opts); + case RECORD: + return new RecordCompressWriter(conf, kind, opts); + case BLOCK: + return new BlockCompressWriter(conf, kind, opts); + } + } + /** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. @@ -253,13 +280,15 @@ static public void setCompressionType(Configuration job, * @param valClass The 'value' type. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) - throws IOException { - return createWriter(fs, conf, name, keyClass, valClass, - getCompressionType(conf)); + Class keyClass, Class valClass) throws IOException { + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass)); } /** @@ -272,15 +301,17 @@ static public void setCompressionType(Configuration job, * @param compressionType The compression type. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionType compressionType) - throws IOException { - return createWriter(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), - compressionType, new DefaultCodec(), null, new Metadata()); + Class keyClass, Class valClass, + CompressionType compressionType) throws IOException { + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType)); } /** @@ -294,15 +325,18 @@ static public void setCompressionType(Configuration job, * @param progress The Progressable object to track progress. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, Progressable progress) throws IOException { - return createWriter(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), - compressionType, new DefaultCodec(), progress, new Metadata()); + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.progressable(progress)); } /** @@ -316,16 +350,18 @@ static public void setCompressionType(Configuration job, * @param codec The compression codec. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - CompressionType compressionType, CompressionCodec codec) - throws IOException { - return createWriter(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), - compressionType, codec, null, new Metadata()); + Class keyClass, Class valClass, CompressionType compressionType, + CompressionCodec codec) throws IOException { + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec)); } /** @@ -341,16 +377,21 @@ static public void setCompressionType(Configuration job, * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException { - return createWriter(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), - compressionType, codec, progress, metadata); + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec), + Writer.progressable(progress), + Writer.metadata(metadata)); } /** @@ -369,37 +410,25 @@ static public void setCompressionType(Configuration job, * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize, CompressionType compressionType, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException { - if ((codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop code!"); - } - - Writer writer = null; - - if (compressionType == CompressionType.NONE) { - writer = new Writer(fs, conf, name, keyClass, valClass, - bufferSize, replication, blockSize, - progress, metadata); - } else if (compressionType == CompressionType.RECORD) { - writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, - bufferSize, replication, blockSize, - codec, progress, metadata); - } else if (compressionType == CompressionType.BLOCK){ - writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, - bufferSize, replication, blockSize, - codec, progress, metadata); - } - - return writer; + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.bufferSize(bufferSize), + Writer.replication(replication), + Writer.blockSize(blockSize), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec), + Writer.progressable(progress), + Writer.metadata(metadata)); } /** @@ -414,96 +443,22 @@ static public void setCompressionType(Configuration job, * @param progress The Progressable object to track progress. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType, CompressionCodec codec, Progressable progress) throws IOException { - Writer writer = createWriter(fs, conf, name, keyClass, valClass, - compressionType, codec, progress, new Metadata()); - return writer; + return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec), + Writer.progressable(progress)); } - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param out The stream on top which the writer is to be constructed. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compress Compress data? - * @param blockCompress Compress blocks? - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - private static Writer - createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, boolean compress, boolean blockCompress, - CompressionCodec codec, Metadata metadata) - throws IOException { - if (codec != null && (codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop code!"); - } - - Writer writer = null; - - if (!compress) { - writer = new Writer(conf, out, keyClass, valClass, metadata); - } else if (compress && !blockCompress) { - writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata); - } else { - writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata); - } - - return writer; - } - - /** - * Construct the preferred type of 'raw' SequenceFile Writer. - * @param fs The configured filesystem. - * @param conf The configuration. - * @param file The name of the file. - * @param keyClass The 'key' type. - * @param valClass The 'value' type. - * @param compress Compress data? - * @param blockCompress Compress blocks? - * @param codec The compression codec. - * @param progress - * @param metadata The metadata of the file. - * @return Returns the handle to the constructed SequenceFile Writer. - * @throws IOException - */ - private static Writer - createWriter(FileSystem fs, Configuration conf, Path file, - Class keyClass, Class valClass, - boolean compress, boolean blockCompress, - CompressionCodec codec, Progressable progress, Metadata metadata) - throws IOException { - if (codec != null && (codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop code!"); - } - - Writer writer = null; - - if (!compress) { - writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata); - } else if (compress && !blockCompress) { - writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, - codec, progress, metadata); - } else { - writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, - codec, progress, metadata); - } - - return writer; -} - /** * Construct the preferred type of 'raw' SequenceFile Writer. * @param conf The configuration. @@ -515,30 +470,20 @@ static public void setCompressionType(Configuration job, * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec, Metadata metadata) - throws IOException { - if ((codec instanceof GzipCodec) && - !NativeCodeLoader.isNativeCodeLoaded() && - !ZlibFactory.isNativeZlibLoaded(conf)) { - throw new IllegalArgumentException("SequenceFile doesn't work with " + - "GzipCodec without native-hadoop code!"); - } - - Writer writer = null; - - if (compressionType == CompressionType.NONE) { - writer = new Writer(conf, out, keyClass, valClass, metadata); - } else if (compressionType == CompressionType.RECORD) { - writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata); - } else if (compressionType == CompressionType.BLOCK){ - writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata); - } - - return writer; + Class keyClass, Class valClass, + CompressionType compressionType, + CompressionCodec codec, Metadata metadata) throws IOException { + return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec), + Writer.metadata(metadata)); } /** @@ -551,15 +496,18 @@ static public void setCompressionType(Configuration job, * @param codec The compression codec. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException + * @deprecated Use {@link #createWriter(Configuration, Writer.Option...)} + * instead. */ + @Deprecated public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType, - CompressionCodec codec) - throws IOException { - Writer writer = createWriter(conf, out, keyClass, valClass, compressionType, - codec, new Metadata()); - return writer; + CompressionCodec codec) throws IOException { + return createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec)); } @@ -790,7 +738,7 @@ public String toString() { /** Write key/value pairs to a sequence-format file. */ public static class Writer implements java.io.Closeable { - Configuration conf; + private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; DataOutputBuffer buffer = new DataOutputBuffer(); @@ -798,7 +746,7 @@ public static class Writer implements java.io.Closeable { Class keyClass; Class valClass; - private boolean compress; + private final CompressionType compress; CompressionCodec codec = null; CompressionOutputStream deflateFilter = null; DataOutputStream deflateOut = null; @@ -825,73 +773,269 @@ public static class Writer implements java.io.Closeable { } } - /** Implicit constructor: needed for the period of transition!*/ - Writer() - {} + public static interface Option {} - /** Create the named file. */ - public Writer(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass) - throws IOException { - this(fs, conf, name, keyClass, valClass, null, new Metadata()); + static class FileOption extends Options.PathOption + implements Option { + FileOption(Path path) { + super(path); + } + } + + static class StreamOption extends Options.FSDataOutputStreamOption + implements Option { + StreamOption(FSDataOutputStream stream) { + super(stream); + } + } + + static class BufferSizeOption extends Options.IntegerOption + implements Option { + BufferSizeOption(int value) { + super(value); + } } - /** Create the named file with write-progress reporter. */ + static class BlockSizeOption extends Options.LongOption implements Option { + BlockSizeOption(long value) { + super(value); + } + } + + static class ReplicationOption extends Options.IntegerOption + implements Option { + ReplicationOption(int value) { + super(value); + } + } + + static class KeyClassOption extends Options.ClassOption implements Option { + KeyClassOption(Class value) { + super(value); + } + } + + static class ValueClassOption extends Options.ClassOption + implements Option { + ValueClassOption(Class value) { + super(value); + } + } + + static class MetadataOption implements Option { + private final Metadata value; + MetadataOption(Metadata value) { + this.value = value; + } + Metadata getValue() { + return value; + } + } + + static class ProgressableOption extends Options.ProgressableOption + implements Option { + ProgressableOption(Progressable value) { + super(value); + } + } + + private static class CompressionTypeOption implements Option { + private final CompressionType value; + CompressionTypeOption(CompressionType value) { + this.value = value; + } + CompressionType getValue() { + return value; + } + } + + private static class CompressionCodecOption implements Option { + private final CompressionCodec value; + CompressionCodecOption(CompressionCodec value) { + this.value = value; + } + CompressionCodec getValue() { + return value; + } + } + + public static Option file(Path value) { + return new FileOption(value); + } + + public static Option bufferSize(int value) { + return new BufferSizeOption(value); + } + + public static Option stream(FSDataOutputStream value) { + return new StreamOption(value); + } + + public static Option replication(short value) { + return new ReplicationOption(value); + } + + public static Option blockSize(long value) { + return new BlockSizeOption(value); + } + + public static Option progressable(Progressable value) { + return new ProgressableOption(value); + } + + public static Option keyClass(Class value) { + return new KeyClassOption(value); + } + + public static Option valueClass(Class value) { + return new ValueClassOption(value); + } + + public static Option metadata(Metadata value) { + return new MetadataOption(value); + } + + public static Option compressionType(CompressionType value) { + return new CompressionTypeOption(value); + } + + public static Option compressionCodec(CompressionCodec value) { + return new CompressionCodecOption(value); + } + + /** + * Construct a uncompressed writer from a set of options. + * @param conf the configuration to use + * @param compressionType the compression type being used + * @param options the options used when creating the writer + * @throws IOException if it fails + */ + Writer(Configuration conf, + CompressionType compressionType, + Option... opts) throws IOException { + this.compress = compressionType; + BlockSizeOption blockSizeOption = + Options.getOption(BlockSizeOption.class, opts); + BufferSizeOption bufferSizeOption = + Options.getOption(BufferSizeOption.class, opts); + ReplicationOption replicationOption = + Options.getOption(ReplicationOption.class, opts); + ProgressableOption progressOption = + Options.getOption(ProgressableOption.class, opts); + FileOption fileOption = Options.getOption(FileOption.class, opts); + StreamOption streamOption = Options.getOption(StreamOption.class, opts); + KeyClassOption keyClassOption = + Options.getOption(KeyClassOption.class, opts); + ValueClassOption valueClassOption = + Options.getOption(ValueClassOption.class, opts); + CompressionCodecOption compressionCodecOption = + Options.getOption(CompressionCodecOption.class, opts); + MetadataOption metadataOption = + Options.getOption(MetadataOption.class, opts); + // check consistency of options + if ((fileOption == null) == (streamOption == null)) { + throw new IllegalArgumentException("file or stream must be specified"); + } + if (fileOption == null && (blockSizeOption != null || + bufferSizeOption != null || + replicationOption != null || + progressOption != null)) { + throw new IllegalArgumentException("file modifier options not " + + "compatible with stream"); + } + + FSDataOutputStream out; + boolean ownStream = fileOption != null; + if (ownStream) { + Path p = fileOption.getValue(); + FileSystem fs = p.getFileSystem(conf); + int bufferSize = bufferSizeOption == null ? getBufferSize(conf) : + bufferSizeOption.getValue(); + short replication = replicationOption == null ? + fs.getDefaultReplication() : + (short) replicationOption.getValue(); + long blockSize = blockSizeOption == null ? fs.getDefaultBlockSize() : + blockSizeOption.getValue(); + Progressable progress = progressOption == null ? null : + progressOption.getValue(); + out = fs.create(p, false, bufferSize, replication, blockSize, progress); + } else { + out = streamOption.getValue(); + } + Class keyClass = keyClassOption == null ? + Object.class : keyClassOption.getValue(); + Class valueClass = valueClassOption == null ? + Object.class : valueClassOption.getValue(); + Metadata metadata = metadataOption == null ? + new Metadata() : metadataOption.getValue(); + CompressionCodec codec; + if (compressionType == CompressionType.NONE) { + codec = null; + } else { + codec = compressionCodecOption == null ? + new DefaultCodec() : compressionCodecOption.getValue(); + } + if (codec != null && + (codec instanceof GzipCodec) && + !NativeCodeLoader.isNativeCodeLoaded() && + !ZlibFactory.isNativeZlibLoaded(conf)) { + throw new IllegalArgumentException("SequenceFile doesn't work with " + + "GzipCodec without native-hadoop " + + "code!"); + } + init(conf, out, ownStream, keyClass, valueClass, codec, metadata); + } + + /** Create the named file. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated + public Writer(FileSystem fs, Configuration conf, Path name, + Class keyClass, Class valClass) throws IOException { + this.compress = CompressionType.NONE; + init(conf, fs.create(name), true, keyClass, valClass, null, + new Metadata()); + } + + /** Create the named file with write-progress reporter. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, - Progressable progress, Metadata metadata) - throws IOException { - this(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), - progress, metadata); + Progressable progress, Metadata metadata) throws IOException { + this.compress = CompressionType.NONE; + init(conf, fs.create(name, progress), true, keyClass, valClass, + null, metadata); } - /** Create the named file with write-progress reporter. */ + /** Create the named file with write-progress reporter. + * @deprecated Use + * {@link SequenceFile#createWriter(Configuration, Writer.Option...)} + * instead. + */ + @Deprecated public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize, - Progressable progress, Metadata metadata) - throws IOException { - init(name, conf, + Progressable progress, Metadata metadata) throws IOException { + this.compress = CompressionType.NONE; + init(conf, fs.create(name, true, bufferSize, replication, blockSize, progress), - keyClass, valClass, false, null, metadata); - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); + true, keyClass, valClass, null, metadata); } - /** Write to an arbitrary stream using a specified buffer size. */ - private Writer(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, Metadata metadata) - throws IOException { - this.ownOutputStream = false; - init(null, conf, out, keyClass, valClass, false, null, metadata); - - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); - } - - /** Write the initial part of file header. */ - void initializeFileHeader() - throws IOException{ - out.write(VERSION); - } - - /** Write the final part of file header. */ - void finalizeFileHeader() - throws IOException{ - out.write(sync); // write the sync bytes - out.flush(); // flush header - } - - boolean isCompressed() { return compress; } - boolean isBlockCompressed() { return false; } + boolean isCompressed() { return compress != CompressionType.NONE; } + boolean isBlockCompressed() { return compress == CompressionType.BLOCK; } /** Write and flush the file header. */ - void writeFileHeader() + private void writeFileHeader() throws IOException { + out.write(VERSION); Text.writeString(out, keyClass.getName()); Text.writeString(out, valClass.getName()); @@ -902,19 +1046,21 @@ void writeFileHeader() Text.writeString(out, (codec.getClass()).getName()); } this.metadata.write(out); + out.write(sync); // write the sync bytes + out.flush(); // flush header } /** Initialize. */ @SuppressWarnings("unchecked") - void init(Path name, Configuration conf, FSDataOutputStream out, + void init(Configuration conf, FSDataOutputStream out, boolean ownStream, Class keyClass, Class valClass, - boolean compress, CompressionCodec codec, Metadata metadata) + CompressionCodec codec, Metadata metadata) throws IOException { this.conf = conf; this.out = out; + this.ownOutputStream = ownStream; this.keyClass = keyClass; this.valClass = valClass; - this.compress = compress; this.codec = codec; this.metadata = metadata; SerializationFactory serializationFactory = new SerializationFactory(conf); @@ -931,6 +1077,7 @@ void init(Path name, Configuration conf, FSDataOutputStream out, this.compressedValSerializer = serializationFactory.getSerializer(valClass); this.compressedValSerializer.open(deflateOut); } + writeFileHeader(); } /** Returns the class of keys in this file. */ @@ -985,7 +1132,7 @@ synchronized void checkAndWriteSync() throws IOException { } /** Append a key/value pair. */ - public synchronized void append(Writable key, Writable val) + public void append(Writable key, Writable val) throws IOException { append((Object) key, (Object) val); } @@ -1010,7 +1157,7 @@ public synchronized void append(Object key, Object val) throw new IOException("negative length keys not allowed: " + key); // Append the 'value' - if (compress) { + if (compress == CompressionType.RECORD) { deflateFilter.resetState(); compressedValSerializer.serialize(val); deflateOut.flush(); @@ -1059,63 +1206,11 @@ public synchronized long getLength() throws IOException { /** Write key/compressed-value pairs to a sequence-format file. */ static class RecordCompressWriter extends Writer { - /** Create the named file. */ - public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec) - throws IOException { - this(conf, fs.create(name), keyClass, valClass, codec, new Metadata()); + RecordCompressWriter(Configuration conf, + CompressionType compressionType, + Option... options) throws IOException { + super(conf, compressionType, options); } - - /** Create the named file with write-progress reporter. */ - public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress, Metadata metadata) - throws IOException { - this(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec, - progress, metadata); - } - - /** Create the named file with write-progress reporter. */ - public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - int bufferSize, short replication, long blockSize, - CompressionCodec codec, - Progressable progress, Metadata metadata) - throws IOException { - super.init(name, conf, - fs.create(name, true, bufferSize, replication, blockSize, progress), - keyClass, valClass, true, codec, metadata); - - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); - } - - /** Create the named file with write-progress reporter. */ - public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress) - throws IOException { - this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata()); - } - - /** Write to an arbitrary stream using a specified buffer size. */ - private RecordCompressWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) - throws IOException { - this.ownOutputStream = false; - super.init(null, conf, out, keyClass, valClass, true, codec, metadata); - - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); - - } - - boolean isCompressed() { return true; } - boolean isBlockCompressed() { return false; } /** Append a key/value pair. */ @SuppressWarnings("unchecked") @@ -1178,79 +1273,20 @@ static class BlockCompressWriter extends Writer { private DataOutputBuffer valLenBuffer = new DataOutputBuffer(); private DataOutputBuffer valBuffer = new DataOutputBuffer(); - private int compressionBlockSize; + private final int compressionBlockSize; - /** Create the named file. */ - public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec) - throws IOException { - this(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec, - null, new Metadata()); - } - - /** Create the named file with write-progress reporter. */ - public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress, Metadata metadata) - throws IOException { - this(fs, conf, name, keyClass, valClass, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec, - progress, metadata); - } - - /** Create the named file with write-progress reporter. */ - public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, - int bufferSize, short replication, long blockSize, - CompressionCodec codec, - Progressable progress, Metadata metadata) - throws IOException { - super.init(name, conf, - fs.create(name, true, bufferSize, replication, blockSize, progress), - keyClass, valClass, true, codec, metadata); - init(conf.getInt("io.seqfile.compress.blocksize", 1000000)); - - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); - } - - /** Create the named file with write-progress reporter. */ - public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, - Class keyClass, Class valClass, CompressionCodec codec, - Progressable progress) - throws IOException { - this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata()); - } - - /** Write to an arbitrary stream using a specified buffer size. */ - private BlockCompressWriter(Configuration conf, FSDataOutputStream out, - Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata) - throws IOException { - this.ownOutputStream = false; - super.init(null, conf, out, keyClass, valClass, true, codec, metadata); - init(1000000); - - initializeFileHeader(); - writeFileHeader(); - finalizeFileHeader(); - } - - boolean isCompressed() { return true; } - boolean isBlockCompressed() { return true; } - - /** Initialize */ - void init(int compressionBlockSize) throws IOException { - this.compressionBlockSize = compressionBlockSize; + BlockCompressWriter(Configuration conf, + CompressionType compressionType, + Option... options) throws IOException { + super(conf, compressionType, options); + compressionBlockSize = + conf.getInt("io.seqfile.compress.blocksize", 1000000); keySerializer.close(); keySerializer.open(keyBuffer); uncompressedValSerializer.close(); uncompressedValSerializer.open(valBuffer); } - + /** Workhorse to check and write out compressed data/lengths */ private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) @@ -1361,10 +1397,15 @@ public synchronized void appendRaw(byte[] keyData, int keyOffset, } } // BlockCompressionWriter - + + /** Get the configured buffer size */ + private static int getBufferSize(Configuration conf) { + return conf.getInt("io.file.buffer.size", 4096); + } + /** Reads key/value pairs from a sequence-format file. */ public static class Reader implements java.io.Closeable { - private Path file; + private String filename; private FSDataInputStream in; private DataOutputBuffer outBuf = new DataOutputBuffer(); @@ -1420,66 +1461,188 @@ public static class Reader implements java.io.Closeable { private Deserializer keyDeserializer; private Deserializer valDeserializer; + /** + * A tag interface for all of the Reader options + */ + public static interface Option {} + + /** + * Create an option to specify the path name of the sequence file. + * @param value the path to read + * @return a new option + */ + public static Option file(Path value) { + return new FileOption(value); + } + + /** + * Create an option to specify the stream with the sequence file. + * @param value the stream to read. + * @return a new option + */ + public static Option stream(FSDataInputStream value) { + return new InputStreamOption(value); + } + + /** + * Create an option to specify the starting byte to read. + * @param value the number of bytes to skip over + * @return a new option + */ + public static Option start(long value) { + return new StartOption(value); + } + + /** + * Create an option to specify the number of bytes to read. + * @param value the number of bytes to read + * @return a new option + */ + public static Option length(long value) { + return new LengthOption(value); + } + + /** + * Create an option with the buffer size for reading the given pathname. + * @param value the number of bytes to buffer + * @return a new option + */ + public static Option bufferSize(int value) { + return new BufferSizeOption(value); + } + + private static class FileOption extends Options.PathOption + implements Option { + private FileOption(Path value) { + super(value); + } + } + + private static class InputStreamOption + extends Options.FSDataInputStreamOption + implements Option { + private InputStreamOption(FSDataInputStream value) { + super(value); + } + } + + private static class StartOption extends Options.LongOption + implements Option { + private StartOption(long value) { + super(value); + } + } + + private static class LengthOption extends Options.LongOption + implements Option { + private LengthOption(long value) { + super(value); + } + } + + private static class BufferSizeOption extends Options.IntegerOption + implements Option { + private BufferSizeOption(int value) { + super(value); + } + } + + // only used directly + private static class OnlyHeaderOption extends Options.BooleanOption + implements Option { + private OnlyHeaderOption() { + super(true); + } + } + + public Reader(Configuration conf, Option... opts) throws IOException { + // Look up the options, these are null if not set + FileOption fileOpt = Options.getOption(FileOption.class, opts); + InputStreamOption streamOpt = + Options.getOption(InputStreamOption.class, opts); + StartOption startOpt = Options.getOption(StartOption.class, opts); + LengthOption lenOpt = Options.getOption(LengthOption.class, opts); + BufferSizeOption bufOpt = Options.getOption(BufferSizeOption.class,opts); + OnlyHeaderOption headerOnly = + Options.getOption(OnlyHeaderOption.class, opts); + // check for consistency + if ((fileOpt == null) == (streamOpt == null)) { + throw new + IllegalArgumentException("File or stream option must be specified"); + } + if (fileOpt == null && bufOpt != null) { + throw new IllegalArgumentException("buffer size can only be set when" + + " a file is specified."); + } + // figure out the real values + Path filename = null; + FSDataInputStream file; + long len = lenOpt == null ? Long.MAX_VALUE : lenOpt.getValue(); + if (fileOpt != null) { + filename = fileOpt.getValue(); + FileSystem fs = filename.getFileSystem(conf); + int bufSize = bufOpt == null ? getBufferSize(conf): bufOpt.getValue(); + file = fs.open(filename, bufSize); + len = fs.getFileStatus(filename).getLen(); + } else { + file = streamOpt.getValue(); + } + long start = startOpt == null ? 0 : startOpt.getValue(); + // really set up + initialize(filename, file, start, len, conf, headerOnly != null); + } + /** * Construct a reader by opening a file from the given file system. * @param fs The file system used to open the file. * @param file The file being read. * @param conf Configuration * @throws IOException + * @deprecated Use Reader(Configuration, Option...) instead. */ - public Reader(FileSystem fs, Path file, Configuration conf) - throws IOException { - this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false); + @Deprecated + public Reader(FileSystem fs, Path file, + Configuration conf) throws IOException { + initialize(file, + fs.open(file, getBufferSize(conf)), + 0L, fs.getFileStatus(file).getLen(), conf, false); } /** * Construct a reader by the given input stream. * @param in An input stream. - * @param buffersize The buffer size used to read the file. + * @param buffersize unused * @param start The starting position. * @param length The length being read. * @param conf Configuration * @throws IOException + * @deprecated Use Reader(Configuration, Reader.Option...) instead. */ + @Deprecated public Reader(FSDataInputStream in, int buffersize, long start, long length, Configuration conf) throws IOException { - this(null, null, in, buffersize, start, length, conf, false); + initialize(null, in, start, length, conf, false); } - private Reader(FileSystem fs, Path file, int bufferSize, - Configuration conf, boolean tempReader) throws IOException { - this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(), - conf, tempReader); - } - - /** - * Private constructor. - * @param fs The file system used to open the file. - * It is not used if the given input stream is not null. - * @param file The file being read. - * @param in An input stream of the file. If it is null, - * the file will be opened from the given file system. - * @param bufferSize The buffer size used to read the file. - * @param start The starting position. - * @param length The length being read. - * @param conf Configuration - * @param tempReader Is this temporary? - * @throws IOException - */ - private Reader(FileSystem fs, Path file, FSDataInputStream in, - int bufferSize, long start, long length, Configuration conf, - boolean tempReader) throws IOException { - if (fs == null && in == null) { - throw new IllegalArgumentException("fs == null && in == null"); + /** Common work of the constructors. */ + private void initialize(Path filename, FSDataInputStream in, + long start, long length, Configuration conf, + boolean tempReader) throws IOException { + if (in == null) { + throw new IllegalArgumentException("in == null"); } - - this.file = file; - this.in = in != null? in: openFile(fs, file, bufferSize, length); + this.filename = filename == null ? "" : filename.toString(); + this.in = in; this.conf = conf; boolean succeeded = false; try { seek(start); this.end = this.in.getPos() + length; + System.out.println("Setting end to " + end); + // if it wrapped around, use the max + if (end < length) { + end = Long.MAX_VALUE; + } init(tempReader); succeeded = true; } finally { @@ -1695,6 +1858,18 @@ public synchronized Class getValueClass() { /** Returns the compression codec of data in this file. */ public CompressionCodec getCompressionCodec() { return codec; } + + /** + * Get the compression type for this file. + * @return the compression type + */ + public CompressionType getCompressionType() { + if (decompress) { + return blockCompressed ? CompressionType.BLOCK : CompressionType.RECORD; + } else { + return CompressionType.NONE; + } + } /** Returns the metadata object of the file */ public Metadata getMetadata() { @@ -1984,7 +2159,8 @@ private synchronized int readRecordLength() throws IOException { * of the value may be computed by calling buffer.getLength() before and * after calls to this method. */ /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */ - public synchronized int next(DataOutputBuffer buffer) throws IOException { + @Deprecated + synchronized int next(DataOutputBuffer buffer) throws IOException { // Unsupported for block-compressed sequence files if (blockCompressed) { throw new IOException("Unsupported call for block-compressed" + @@ -2279,7 +2455,7 @@ public synchronized long getPosition() throws IOException { /** Returns the name of the file. */ public String toString() { - return file == null? "": file.toString(); + return filename; } } @@ -2454,8 +2630,7 @@ public int run(boolean deleteInput) throws IOException { int segments = 0; int currentFile = 0; boolean atEof = (currentFile >= inFiles.length); - boolean isCompressed = false; - boolean isBlockCompressed = false; + CompressionType compressionType; CompressionCodec codec = null; segmentLengths.clear(); if (atEof) { @@ -2464,8 +2639,7 @@ public int run(boolean deleteInput) throws IOException { // Initialize in = new Reader(fs, inFiles[currentFile], conf); - isCompressed = in.isCompressed(); - isBlockCompressed = in.isBlockCompressed(); + compressionType = in.getCompressionType(); codec = in.getCompressionCodec(); for (int i=0; i < rawValues.length; ++i) { @@ -2526,7 +2700,7 @@ public int run(boolean deleteInput) throws IOException { if (progressable != null) { progressable.progress(); } - flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, + flush(count, bytesProcessed, compressionType, codec, segments==0 && atEof); segments++; } @@ -2569,9 +2743,10 @@ private ValueBytes[] grow(ValueBytes[] old, int newLength) { return result; } - private void flush(int count, int bytesProcessed, boolean isCompressed, - boolean isBlockCompressed, CompressionCodec codec, boolean done) - throws IOException { + private void flush(int count, int bytesProcessed, + CompressionType compressionType, + CompressionCodec codec, + boolean done) throws IOException { if (out == null) { outName = done ? outFile : outFile.suffix(".0"); out = fs.create(outName); @@ -2581,9 +2756,14 @@ private void flush(int count, int bytesProcessed, boolean isCompressed, } long segmentStart = out.getPos(); - Writer writer = createWriter(conf, out, keyClass, valClass, - isCompressed, isBlockCompressed, codec, - done ? metadata : new Metadata()); + Writer writer = createWriter(conf, + Writer.stream(out), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compressionType), + Writer.compressionCodec(codec), + Writer.metadata(done ? metadata : + new Metadata())); if (!done) { writer.sync = null; // disable sync on temp files @@ -2751,19 +2931,21 @@ public RawKeyValueIterator merge(Path [] inNames, Path tempDir, * @throws IOException */ public Writer cloneFileAttributes(Path inputFile, Path outputFile, - Progressable prog) - throws IOException { - FileSystem srcFileSys = inputFile.getFileSystem(conf); - Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true); - boolean compress = reader.isCompressed(); - boolean blockCompress = reader.isBlockCompressed(); + Progressable prog) throws IOException { + Reader reader = new Reader(conf, + Reader.file(inputFile), + new Reader.OnlyHeaderOption()); + CompressionType compress = reader.getCompressionType(); CompressionCodec codec = reader.getCompressionCodec(); reader.close(); - Writer writer = createWriter(outputFile.getFileSystem(conf), conf, - outputFile, keyClass, valClass, compress, - blockCompress, codec, prog, - new Metadata()); + Writer writer = createWriter(conf, + Writer.file(outputFile), + Writer.keyClass(keyClass), + Writer.valueClass(valClass), + Writer.compressionType(compress), + Writer.compressionCodec(codec), + Writer.progressable(prog)); return writer; } @@ -3164,13 +3346,15 @@ public int hashCode() { */ public boolean nextRawKey() throws IOException { if (in == null) { - int bufferSize = conf.getInt("io.file.buffer.size", 4096); + int bufferSize = getBufferSize(conf); if (fs.getUri().getScheme().startsWith("ramfs")) { bufferSize = conf.getInt("io.bytes.per.checksum", 512); } - Reader reader = new Reader(fs, segmentPathName, null, - bufferSize, segmentOffset, - segmentLength, conf, false); + Reader reader = new Reader(conf, + Reader.file(segmentPathName), + Reader.bufferSize(bufferSize), + Reader.start(segmentOffset), + Reader.length(segmentLength)); //sometimes we ignore syncs especially for temp merge files if (ignoreSync) reader.ignoreSync(); diff --git a/src/java/org/apache/hadoop/io/SetFile.java b/src/java/org/apache/hadoop/io/SetFile.java index 11b024e652..c7a044680a 100644 --- a/src/java/org/apache/hadoop/io/SetFile.java +++ b/src/java/org/apache/hadoop/io/SetFile.java @@ -57,7 +57,10 @@ public Writer(Configuration conf, FileSystem fs, String dirName, public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, SequenceFile.CompressionType compress) throws IOException { - super(conf, fs, dirName, comparator, NullWritable.class, compress); + super(conf, new Path(dirName), + comparator(comparator), + keyClass(NullWritable.class), + compressionType(compress)); } /** Append a key to a set. The key must be strictly greater than the @@ -78,7 +81,7 @@ public Reader(FileSystem fs, String dirName, Configuration conf) throws IOExcept /** Construct a set reader for the named set using the named comparator.*/ public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf) throws IOException { - super(fs, dirName, comparator, conf); + super(new Path(dirName), conf, comparator(comparator)); } // javadoc inherited diff --git a/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java b/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java index 172ea2585f..5e2a94c999 100644 --- a/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java +++ b/src/java/org/apache/hadoop/security/RefreshUserMappingsProtocol.java @@ -21,7 +21,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo;