From 98beeca09d2e527c3507989dbae267c2313ae15a Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 1 Dec 2009 23:25:56 +0000 Subject: [PATCH] HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support reading on un-closed file. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@886003 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../org/apache/hadoop/io/SequenceFile.java | 70 +++++++++++++++---- .../hadoop/io/TestSequenceFileSync.java | 25 ++++--- 3 files changed, 76 insertions(+), 22 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 68c2b729a3..cf5f4d6528 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -684,6 +684,9 @@ Release 0.21.0 - Unreleased HADOOP-6261. Add URI based tests for FileContext. (Ravi Pulari via suresh). + HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support + reading on un-closed file. (szetszwo) + BUG FIXES HADOOP-5379. CBZip2InputStream to throw IOException on data crc error. diff --git a/src/java/org/apache/hadoop/io/SequenceFile.java b/src/java/org/apache/hadoop/io/SequenceFile.java index db77fa0ecf..1f49439ef4 100644 --- a/src/java/org/apache/hadoop/io/SequenceFile.java +++ b/src/java/org/apache/hadoop/io/SequenceFile.java @@ -1435,32 +1435,71 @@ public class SequenceFile { private DeserializerBase keyDeserializer; private DeserializerBase valDeserializer; - /** Open the named file. */ + /** + * Construct a reader by opening a file from the given file system. + * @param fs The file system used to open the file. + * @param file The file being read. + * @param conf Configuration + * @throws IOException + */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false); } + /** + * Construct a reader by the given input stream. + * @param in An input stream. + * @param buffersize The buffer size used to read the file. + * @param start The starting position. + * @param length The length being read. + * @param conf Configuration + * @throws IOException + */ + public Reader(FSDataInputStream in, int buffersize, + long start, long length, Configuration conf) throws IOException { + this(null, null, in, buffersize, start, length, conf, false); + } + private Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, boolean tempReader) throws IOException { - this(fs, file, bufferSize, 0, fs.getFileStatus(file).getLen(), conf, tempReader); + this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(), + conf, tempReader); } - - private Reader(FileSystem fs, Path file, int bufferSize, long start, - long length, Configuration conf, boolean tempReader) - throws IOException { + + /** + * Private constructor. + * @param fs The file system used to open the file. + * It is not used if the given input stream is not null. + * @param file The file being read. + * @param in An input stream of the file. If it is null, + * the file will be opened from the given file system. + * @param bufferSize The buffer size used to read the file. + * @param start The starting position. + * @param length The length being read. + * @param conf Configuration + * @param tempReader Is this temporary? + * @throws IOException + */ + private Reader(FileSystem fs, Path file, FSDataInputStream in, + int bufferSize, long start, long length, Configuration conf, + boolean tempReader) throws IOException { + if (fs == null && in == null) { + throw new IllegalArgumentException("fs == null && in == null"); + } + this.file = file; - this.in = openFile(fs, file, bufferSize, length); + this.in = in != null? in: openFile(fs, file, bufferSize, length); this.conf = conf; boolean succeeded = false; try { seek(start); - this.end = in.getPos() + length; + this.end = this.in.getPos() + length; init(tempReader); succeeded = true; } finally { if (!succeeded) { - IOUtils.cleanup(LOG, in); + IOUtils.cleanup(LOG, this.in); } } } @@ -1468,6 +1507,13 @@ public class SequenceFile { /** * Override this method to specialize the type of * {@link FSDataInputStream} returned. + * @param fs The file system used to open the file. + * @param file The file being read. + * @param bufferSize The buffer size used to read the file. + * @param length The length being read if it is >= 0. Otherwise, + * the length is not available. + * @return The opened stream. + * @throws IOException */ protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, long length) throws IOException { @@ -1489,7 +1535,7 @@ public class SequenceFile { if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1]) || (versionBlock[2] != VERSION[2])) - throw new IOException(file + " not a SequenceFile"); + throw new IOException(this + " not a SequenceFile"); // Set 'version' version = versionBlock[3]; @@ -2251,7 +2297,7 @@ public class SequenceFile { /** Returns the name of the file. */ public String toString() { - return file.toString(); + return file == null? "": file.toString(); } } @@ -3132,7 +3178,7 @@ public class SequenceFile { if (fs.getUri().getScheme().startsWith("ramfs")) { bufferSize = conf.getInt("io.bytes.per.checksum", 512); } - Reader reader = new Reader(fs, segmentPathName, + Reader reader = new Reader(fs, segmentPathName, null, bufferSize, segmentOffset, segmentLength, conf, false); diff --git a/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java index 933ee29cfe..fb09425269 100644 --- a/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java +++ b/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java @@ -18,22 +18,17 @@ package org.apache.hadoop.io; -import java.io.File; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.Random; -import java.net.URI; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.Before; import org.junit.Test; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static org.junit.Assert.*; - public class TestSequenceFileSync { private static final int NUMRECORDS = 2000; private static final int RECORDSIZE = 80; @@ -66,8 +61,18 @@ public class TestSequenceFileSync { try { writeSequenceFile(writer, NUMRECORDS); for (int i = 0; i < 5 ; i++) { - final SequenceFile.Reader reader = - new SequenceFile.Reader(fs, path, conf); + final SequenceFile.Reader reader; + + //try different SequenceFile.Reader constructors + if (i % 2 == 0) { + reader = new SequenceFile.Reader(fs, path, conf); + } else { + final FSDataInputStream in = fs.open(path); + final long length = fs.getFileStatus(path).getLen(); + final int buffersize = conf.getInt("io.file.buffer.size", 4096); + reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf); + } + try { forOffset(reader, input, val, i, 0, 0); forOffset(reader, input, val, i, 65, 0);