From fed32afd17ec824d439ed37fce14f3071935c59a Mon Sep 17 00:00:00 2001 From: Devaraj Das Date: Mon, 12 Oct 2009 22:07:17 +0000 Subject: [PATCH] HADOOP-6218. Adds a feature where TFile can be split by Record Sequeunce number. Contributed by Hong Tang and Raghu Angadi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@824516 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 + .../apache/hadoop/io/file/tfile/TFile.java | 136 +++++++++++++++++- .../hadoop/io/file/tfile/TestTFile.java | 2 +- .../io/file/tfile/TestTFileByteArrays.java | 8 +- .../hadoop/io/file/tfile/TestTFileSplit.java | 87 ++++++++++- .../tfile/TestTFileUnsortedByteArrays.java | 2 +- 6 files changed, 225 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2c99083462..7fc0dfefdb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,9 @@ Trunk (unreleased changes) hadoop-config.sh so that it allows setting java command options for JAVA_PLATFORM. (Koji Noguchi via szetszwo) + HADOOP-6218. Adds a feature where TFile can be split by Record + Sequence number. (Hong Tang and Raghu Angadi via ddas) + IMPROVEMENTS HADOOP-6283. Improve the exception messages thrown by diff --git a/src/java/org/apache/hadoop/io/file/tfile/TFile.java b/src/java/org/apache/hadoop/io/file/tfile/TFile.java index 8ef1ecfc0a..d094c60bcd 100644 --- a/src/java/org/apache/hadoop/io/file/tfile/TFile.java +++ b/src/java/org/apache/hadoop/io/file/tfile/TFile.java @@ -669,10 +669,10 @@ void finishDataBlock(boolean bForceFinish) throws IOException { * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner. * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()} * ) , a portion of TFile based on byte offsets ( - * {@link Reader#createScanner(long, long)}), or a portion of TFile with keys + * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys * fall in a certain key range (for sorted TFile only, - * {@link Reader#createScanner(byte[], byte[])} or - * {@link Reader#createScanner(RawComparable, RawComparable)}). + * {@link Reader#createScannerByKey(byte[], byte[])} or + * {@link Reader#createScannerByKey(RawComparable, RawComparable)}). */ public static class Reader implements Closeable { // The underlying BCFile reader. @@ -986,6 +986,16 @@ Location getBlockContainsKey(RawComparable key, boolean greater) return new Location(blkIndex, 0); } + Location getLocationByRecordNum(long recNum) throws IOException { + checkTFileDataIndex(); + return tfileIndex.getLocationByRecordNum(recNum); + } + + long getRecordNumByLocation(Location location) throws IOException { + checkTFileDataIndex(); + return tfileIndex.getRecordNumByLocation(location); + } + int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) { if (!isSorted()) { throw new RuntimeException("Cannot compare keys for unsorted TFiles."); @@ -1016,6 +1026,21 @@ Location getLocationNear(long offset) { return new Location(blockIndex, 0); } + /** + * Get the RecordNum for the first key-value pair in a compressed block + * whose byte offset in the TFile is greater than or equal to the specified + * offset. + * + * @param offset + * the user supplied offset. + * @return the RecordNum to the corresponding entry. If no such entry + * exists, it returns the total entry count. + * @throws IOException + */ + public long getRecordNumNear(long offset) throws IOException { + return getRecordNumByLocation(getLocationNear(offset)); + } + /** * Get a sample key that is within a block whose starting offset is greater * than or equal to the specified offset. @@ -1058,7 +1083,7 @@ public Scanner createScanner() throws IOException { * contains zero key-value pairs even if length is positive. * @throws IOException */ - public Scanner createScanner(long offset, long length) throws IOException { + public Scanner createScannerByByteRange(long offset, long length) throws IOException { return new Scanner(this, offset, offset + length); } @@ -1074,10 +1099,31 @@ public Scanner createScanner(long offset, long length) throws IOException { * @return The actual coverage of the returned scanner will cover all keys * greater than or equal to the beginKey and less than the endKey. * @throws IOException + * + * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead. */ + @Deprecated public Scanner createScanner(byte[] beginKey, byte[] endKey) + throws IOException { + return createScannerByKey(beginKey, endKey); + } + + /** + * Get a scanner that covers a portion of TFile based on keys. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + */ + public Scanner createScannerByKey(byte[] beginKey, byte[] endKey) throws IOException { - return createScanner((beginKey == null) ? null : new ByteArray(beginKey, + return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey, 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey, 0, endKey.length)); } @@ -1094,9 +1140,31 @@ public Scanner createScanner(byte[] beginKey, byte[] endKey) * @return The actual coverage of the returned scanner will cover all keys * greater than or equal to the beginKey and less than the endKey. * @throws IOException + * + * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)} + * instead. */ + @Deprecated public Scanner createScanner(RawComparable beginKey, RawComparable endKey) throws IOException { + return createScannerByKey(beginKey, endKey); + } + + /** + * Get a scanner that covers a specific key range. + * + * @param beginKey + * Begin key of the scan (inclusive). If null, scan from the first + * key-value entry of the TFile. + * @param endKey + * End key of the scan (exclusive). If null, scan up to the last + * key-value entry of the TFile. + * @return The actual coverage of the returned scanner will cover all keys + * greater than or equal to the beginKey and less than the endKey. + * @throws IOException + */ + public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey) + throws IOException { if ((beginKey != null) && (endKey != null) && (compareKeys(beginKey, endKey) >= 0)) { return new Scanner(this, beginKey, beginKey); @@ -1104,6 +1172,27 @@ public Scanner createScanner(RawComparable beginKey, RawComparable endKey) return new Scanner(this, beginKey, endKey); } + /** + * Create a scanner that covers a range of records. + * + * @param beginRecNum + * The RecordNum for the first record (inclusive). + * @param endRecNum + * The RecordNum for the last record (exclusive). To scan the whole + * file, either specify endRecNum==-1 or endRecNum==getEntryCount(). + * @return The TFile scanner that covers the specified range of records. + * @throws IOException + */ + public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum) + throws IOException { + if (beginRecNum < 0) beginRecNum = 0; + if (endRecNum < 0 || endRecNum > getEntryCount()) { + endRecNum = getEntryCount(); + } + return new Scanner(this, getLocationByRecordNum(beginRecNum), + getLocationByRecordNum(endRecNum)); + } + /** * The TFile Scanner. The Scanner has an implicit cursor, which, upon * creation, points to the first key-value pair in the scan range. If the @@ -1523,6 +1612,15 @@ public Entry entry() throws IOException { return new Entry(); } + /** + * Get the RecordNum corresponding to the entry pointed by the cursor. + * @return The RecordNum corresponding to the entry pointed by the cursor. + * @throws IOException + */ + public long getRecordNum() throws IOException { + return reader.getRecordNumByLocation(currentLocation); + } + /** * Internal API. Comparing the key at cursor to user-specified key. * @@ -2021,8 +2119,10 @@ static class TFileIndex { final static String BLOCK_NAME = "TFile.index"; private ByteArray firstKey; private final ArrayList index; + private final ArrayList recordNumIndex; private final BytesComparator comparator; - + private long sum = 0; + /** * For reading from file. * @@ -2031,6 +2131,7 @@ static class TFileIndex { public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) throws IOException { index = new ArrayList(entryCount); + recordNumIndex = new ArrayList(entryCount); int size = Utils.readVInt(in); // size for the first key entry. if (size > 0) { byte[] buffer = new byte[size]; @@ -2052,6 +2153,8 @@ public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream( buffer, 0, size))); index.add(idx); + sum += idx.entries(); + recordNumIndex.add(sum); } } else { if (entryCount != 0) { @@ -2083,6 +2186,12 @@ public int lowerBound(RawComparable key) { return ret; } + /** + * @param key + * input key. + * @return the ID of the first block that contains key > input key. Or -1 + * if no such block exists. + */ public int upperBound(RawComparable key) { if (comparator == null) { throw new RuntimeException("Cannot search in unsorted TFile"); @@ -2104,13 +2213,26 @@ public int upperBound(RawComparable key) { */ public TFileIndex(BytesComparator comparator) { index = new ArrayList(); + recordNumIndex = new ArrayList(); this.comparator = comparator; } public RawComparable getFirstKey() { return firstKey; } + + public Reader.Location getLocationByRecordNum(long recNum) { + int idx = Utils.upperBound(recordNumIndex, recNum); + long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1); + return new Reader.Location(idx, recNum-lastRecNum); + } + public long getRecordNumByLocation(Reader.Location location) { + int blkIndex = location.getBlockIndex(); + long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1); + return lastRecNum + location.getRecordIndex(); + } + public void setFirstKey(byte[] key, int offset, int length) { firstKey = new ByteArray(new byte[length]); System.arraycopy(key, offset, firstKey.buffer(), 0, length); @@ -2125,6 +2247,8 @@ public RawComparable getLastKey() { public void addEntry(TFileIndexEntry keyEntry) { index.add(keyEntry); + sum += keyEntry.entries(); + recordNumIndex.add(sum); } public TFileIndexEntry getEntry(int bid) { diff --git a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java index 38feaface2..af52e7557e 100644 --- a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java +++ b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFile.java @@ -319,7 +319,7 @@ void basicWithSomeCodec(String codec) throws IOException { scanner.close(); // test for a range of scanner - scanner = reader.createScanner(getSomeKey(10), getSomeKey(60)); + scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60)); readAndCheckbytes(scanner, 10, 50); assertFalse(scanner.advance()); scanner.close(); diff --git a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java index 2d397c7e3a..f296e07865 100644 --- a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java +++ b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileByteArrays.java @@ -673,7 +673,7 @@ private void readValueBeforeKey(int count, int recordIndex) Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); Scanner scanner = - reader.createScanner(composeSortedKey(KEY, count, recordIndex) + reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex) .getBytes(), null); try { @@ -698,7 +698,7 @@ private void readKeyWithoutValue(int count, int recordIndex) throws IOException { Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); Scanner scanner = - reader.createScanner(composeSortedKey(KEY, count, recordIndex) + reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex) .getBytes(), null); try { @@ -729,7 +729,7 @@ private void readValueWithoutKey(int count, int recordIndex) Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); Scanner scanner = - reader.createScanner(composeSortedKey(KEY, count, recordIndex) + reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex) .getBytes(), null); byte[] vbuf1 = new byte[BUF_SIZE]; @@ -753,7 +753,7 @@ private void readKeyManyTimes(int count, int recordIndex) throws IOException { Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); Scanner scanner = - reader.createScanner(composeSortedKey(KEY, count, recordIndex) + reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex) .getBytes(), null); // read the indexed key diff --git a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java index 78ec270ec7..470988ec0d 100644 --- a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java +++ b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileSplit.java @@ -17,6 +17,7 @@ package org.apache.hadoop.io.file.tfile; import java.io.IOException; +import java.util.Random; import junit.framework.Assert; import junit.framework.TestCase; @@ -42,6 +43,7 @@ public class TestTFileSplit extends TestCase { private FileSystem fs; private Configuration conf; private Path path; + private Random random = new Random(); private String comparator = "memcmp"; private String outputFile = "TestTFileSplit"; @@ -74,7 +76,7 @@ void readFile() throws IOException { long rowCount = 0; BytesWritable key, value; for (int i = 0; i < numSplit; ++i, offset += splitSize) { - Scanner scanner = reader.createScanner(offset, splitSize); + Scanner scanner = reader.createScannerByByteRange(offset, splitSize); int count = 0; key = new BytesWritable(); value = new BytesWritable(); @@ -90,18 +92,101 @@ void readFile() throws IOException { Assert.assertEquals(rowCount, reader.getEntryCount()); reader.close(); } + + /* Similar to readFile(), tests the scanner created + * by record numbers rather than the offsets. + */ + void readRowSplits(int numSplits) throws IOException { + + Reader reader = + new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf); + + long totalRecords = reader.getEntryCount(); + for (int i=0; i=fileLen should be total entries", + totalRecs, reader.getRecordNumNear(x)); + } + + for (long i = 0; i < 100; ++i) { + assertEquals("Locaton to RecNum conversion not symmetric", i, reader + .getRecordNumByLocation(reader.getLocationByRecordNum(i))); + } + + for (long i = 1; i < 100; ++i) { + long x = totalRecs - i; + assertEquals("Locaton to RecNum conversion not symmetric", x, reader + .getRecordNumByLocation(reader.getLocationByRecordNum(x))); + } + + for (long i = begin; i < end; ++i) { + assertEquals("Locaton to RecNum conversion not symmetric", i, reader + .getRecordNumByLocation(reader.getLocationByRecordNum(i))); + } + + for (int i = 0; i < 1000; ++i) { + long x = random.nextLong() % totalRecs; + if (x < 0) x += totalRecs; + assertEquals("Locaton to RecNum conversion not symmetric", x, reader + .getRecordNumByLocation(reader.getLocationByRecordNum(x))); + } + } + public void testSplit() throws IOException { System.out.println("testSplit"); createFile(100000, Compression.Algorithm.NONE.getName()); + checkRecNums(); readFile(); + readRowSplits(10); fs.delete(path, true); createFile(500000, Compression.Algorithm.GZ.getName()); + checkRecNums(); readFile(); + readRowSplits(83); fs.delete(path, true); } } diff --git a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java index 2825647857..6fb6fdf9b4 100644 --- a/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java +++ b/src/test/core/org/apache/hadoop/io/file/tfile/TestTFileUnsortedByteArrays.java @@ -89,7 +89,7 @@ public void testFailureScannerWithKeys() throws IOException { try { Scanner scanner = - reader.createScanner("aaa".getBytes(), "zzz".getBytes()); + reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes()); Assert .fail("Failed to catch creating scanner with keys on unsorted file."); }