HADOOP-6218. Adds a feature where TFile can be split by Record Sequeunce number. Contributed by Hong Tang and Raghu Angadi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@824516 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d3c314e450
commit
fed32afd17
@ -10,6 +10,9 @@ Trunk (unreleased changes)
|
|||||||
hadoop-config.sh so that it allows setting java command options for
|
hadoop-config.sh so that it allows setting java command options for
|
||||||
JAVA_PLATFORM. (Koji Noguchi via szetszwo)
|
JAVA_PLATFORM. (Koji Noguchi via szetszwo)
|
||||||
|
|
||||||
|
HADOOP-6218. Adds a feature where TFile can be split by Record
|
||||||
|
Sequence number. (Hong Tang and Raghu Angadi via ddas)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HADOOP-6283. Improve the exception messages thrown by
|
HADOOP-6283. Improve the exception messages thrown by
|
||||||
|
@ -669,10 +669,10 @@ void finishDataBlock(boolean bForceFinish) throws IOException {
|
|||||||
* TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
|
* TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner.
|
||||||
* objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
|
* objects. A scanner may scan the whole TFile ({@link Reader#createScanner()}
|
||||||
* ) , a portion of TFile based on byte offsets (
|
* ) , a portion of TFile based on byte offsets (
|
||||||
* {@link Reader#createScanner(long, long)}), or a portion of TFile with keys
|
* {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys
|
||||||
* fall in a certain key range (for sorted TFile only,
|
* fall in a certain key range (for sorted TFile only,
|
||||||
* {@link Reader#createScanner(byte[], byte[])} or
|
* {@link Reader#createScannerByKey(byte[], byte[])} or
|
||||||
* {@link Reader#createScanner(RawComparable, RawComparable)}).
|
* {@link Reader#createScannerByKey(RawComparable, RawComparable)}).
|
||||||
*/
|
*/
|
||||||
public static class Reader implements Closeable {
|
public static class Reader implements Closeable {
|
||||||
// The underlying BCFile reader.
|
// The underlying BCFile reader.
|
||||||
@ -986,6 +986,16 @@ Location getBlockContainsKey(RawComparable key, boolean greater)
|
|||||||
return new Location(blkIndex, 0);
|
return new Location(blkIndex, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Location getLocationByRecordNum(long recNum) throws IOException {
|
||||||
|
checkTFileDataIndex();
|
||||||
|
return tfileIndex.getLocationByRecordNum(recNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
long getRecordNumByLocation(Location location) throws IOException {
|
||||||
|
checkTFileDataIndex();
|
||||||
|
return tfileIndex.getRecordNumByLocation(location);
|
||||||
|
}
|
||||||
|
|
||||||
int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
|
int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) {
|
||||||
if (!isSorted()) {
|
if (!isSorted()) {
|
||||||
throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
|
throw new RuntimeException("Cannot compare keys for unsorted TFiles.");
|
||||||
@ -1016,6 +1026,21 @@ Location getLocationNear(long offset) {
|
|||||||
return new Location(blockIndex, 0);
|
return new Location(blockIndex, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RecordNum for the first key-value pair in a compressed block
|
||||||
|
* whose byte offset in the TFile is greater than or equal to the specified
|
||||||
|
* offset.
|
||||||
|
*
|
||||||
|
* @param offset
|
||||||
|
* the user supplied offset.
|
||||||
|
* @return the RecordNum to the corresponding entry. If no such entry
|
||||||
|
* exists, it returns the total entry count.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public long getRecordNumNear(long offset) throws IOException {
|
||||||
|
return getRecordNumByLocation(getLocationNear(offset));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a sample key that is within a block whose starting offset is greater
|
* Get a sample key that is within a block whose starting offset is greater
|
||||||
* than or equal to the specified offset.
|
* than or equal to the specified offset.
|
||||||
@ -1058,7 +1083,7 @@ public Scanner createScanner() throws IOException {
|
|||||||
* contains zero key-value pairs even if length is positive.
|
* contains zero key-value pairs even if length is positive.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public Scanner createScanner(long offset, long length) throws IOException {
|
public Scanner createScannerByByteRange(long offset, long length) throws IOException {
|
||||||
return new Scanner(this, offset, offset + length);
|
return new Scanner(this, offset, offset + length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1074,10 +1099,31 @@ public Scanner createScanner(long offset, long length) throws IOException {
|
|||||||
* @return The actual coverage of the returned scanner will cover all keys
|
* @return The actual coverage of the returned scanner will cover all keys
|
||||||
* greater than or equal to the beginKey and less than the endKey.
|
* greater than or equal to the beginKey and less than the endKey.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
*
|
||||||
|
* @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public Scanner createScanner(byte[] beginKey, byte[] endKey)
|
public Scanner createScanner(byte[] beginKey, byte[] endKey)
|
||||||
|
throws IOException {
|
||||||
|
return createScannerByKey(beginKey, endKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a scanner that covers a portion of TFile based on keys.
|
||||||
|
*
|
||||||
|
* @param beginKey
|
||||||
|
* Begin key of the scan (inclusive). If null, scan from the first
|
||||||
|
* key-value entry of the TFile.
|
||||||
|
* @param endKey
|
||||||
|
* End key of the scan (exclusive). If null, scan up to the last
|
||||||
|
* key-value entry of the TFile.
|
||||||
|
* @return The actual coverage of the returned scanner will cover all keys
|
||||||
|
* greater than or equal to the beginKey and less than the endKey.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Scanner createScannerByKey(byte[] beginKey, byte[] endKey)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return createScanner((beginKey == null) ? null : new ByteArray(beginKey,
|
return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey,
|
||||||
0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
|
0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey,
|
||||||
0, endKey.length));
|
0, endKey.length));
|
||||||
}
|
}
|
||||||
@ -1094,9 +1140,31 @@ public Scanner createScanner(byte[] beginKey, byte[] endKey)
|
|||||||
* @return The actual coverage of the returned scanner will cover all keys
|
* @return The actual coverage of the returned scanner will cover all keys
|
||||||
* greater than or equal to the beginKey and less than the endKey.
|
* greater than or equal to the beginKey and less than the endKey.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
*
|
||||||
|
* @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)}
|
||||||
|
* instead.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
|
public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
return createScannerByKey(beginKey, endKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a scanner that covers a specific key range.
|
||||||
|
*
|
||||||
|
* @param beginKey
|
||||||
|
* Begin key of the scan (inclusive). If null, scan from the first
|
||||||
|
* key-value entry of the TFile.
|
||||||
|
* @param endKey
|
||||||
|
* End key of the scan (exclusive). If null, scan up to the last
|
||||||
|
* key-value entry of the TFile.
|
||||||
|
* @return The actual coverage of the returned scanner will cover all keys
|
||||||
|
* greater than or equal to the beginKey and less than the endKey.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey)
|
||||||
|
throws IOException {
|
||||||
if ((beginKey != null) && (endKey != null)
|
if ((beginKey != null) && (endKey != null)
|
||||||
&& (compareKeys(beginKey, endKey) >= 0)) {
|
&& (compareKeys(beginKey, endKey) >= 0)) {
|
||||||
return new Scanner(this, beginKey, beginKey);
|
return new Scanner(this, beginKey, beginKey);
|
||||||
@ -1104,6 +1172,27 @@ public Scanner createScanner(RawComparable beginKey, RawComparable endKey)
|
|||||||
return new Scanner(this, beginKey, endKey);
|
return new Scanner(this, beginKey, endKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a scanner that covers a range of records.
|
||||||
|
*
|
||||||
|
* @param beginRecNum
|
||||||
|
* The RecordNum for the first record (inclusive).
|
||||||
|
* @param endRecNum
|
||||||
|
* The RecordNum for the last record (exclusive). To scan the whole
|
||||||
|
* file, either specify endRecNum==-1 or endRecNum==getEntryCount().
|
||||||
|
* @return The TFile scanner that covers the specified range of records.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum)
|
||||||
|
throws IOException {
|
||||||
|
if (beginRecNum < 0) beginRecNum = 0;
|
||||||
|
if (endRecNum < 0 || endRecNum > getEntryCount()) {
|
||||||
|
endRecNum = getEntryCount();
|
||||||
|
}
|
||||||
|
return new Scanner(this, getLocationByRecordNum(beginRecNum),
|
||||||
|
getLocationByRecordNum(endRecNum));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The TFile Scanner. The Scanner has an implicit cursor, which, upon
|
* The TFile Scanner. The Scanner has an implicit cursor, which, upon
|
||||||
* creation, points to the first key-value pair in the scan range. If the
|
* creation, points to the first key-value pair in the scan range. If the
|
||||||
@ -1523,6 +1612,15 @@ public Entry entry() throws IOException {
|
|||||||
return new Entry();
|
return new Entry();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the RecordNum corresponding to the entry pointed by the cursor.
|
||||||
|
* @return The RecordNum corresponding to the entry pointed by the cursor.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public long getRecordNum() throws IOException {
|
||||||
|
return reader.getRecordNumByLocation(currentLocation);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal API. Comparing the key at cursor to user-specified key.
|
* Internal API. Comparing the key at cursor to user-specified key.
|
||||||
*
|
*
|
||||||
@ -2021,7 +2119,9 @@ static class TFileIndex {
|
|||||||
final static String BLOCK_NAME = "TFile.index";
|
final static String BLOCK_NAME = "TFile.index";
|
||||||
private ByteArray firstKey;
|
private ByteArray firstKey;
|
||||||
private final ArrayList<TFileIndexEntry> index;
|
private final ArrayList<TFileIndexEntry> index;
|
||||||
|
private final ArrayList<Long> recordNumIndex;
|
||||||
private final BytesComparator comparator;
|
private final BytesComparator comparator;
|
||||||
|
private long sum = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For reading from file.
|
* For reading from file.
|
||||||
@ -2031,6 +2131,7 @@ static class TFileIndex {
|
|||||||
public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
|
public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
index = new ArrayList<TFileIndexEntry>(entryCount);
|
index = new ArrayList<TFileIndexEntry>(entryCount);
|
||||||
|
recordNumIndex = new ArrayList<Long>(entryCount);
|
||||||
int size = Utils.readVInt(in); // size for the first key entry.
|
int size = Utils.readVInt(in); // size for the first key entry.
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
byte[] buffer = new byte[size];
|
byte[] buffer = new byte[size];
|
||||||
@ -2052,6 +2153,8 @@ public TFileIndex(int entryCount, DataInput in, BytesComparator comparator)
|
|||||||
new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
|
new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream(
|
||||||
buffer, 0, size)));
|
buffer, 0, size)));
|
||||||
index.add(idx);
|
index.add(idx);
|
||||||
|
sum += idx.entries();
|
||||||
|
recordNumIndex.add(sum);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (entryCount != 0) {
|
if (entryCount != 0) {
|
||||||
@ -2083,6 +2186,12 @@ public int lowerBound(RawComparable key) {
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param key
|
||||||
|
* input key.
|
||||||
|
* @return the ID of the first block that contains key > input key. Or -1
|
||||||
|
* if no such block exists.
|
||||||
|
*/
|
||||||
public int upperBound(RawComparable key) {
|
public int upperBound(RawComparable key) {
|
||||||
if (comparator == null) {
|
if (comparator == null) {
|
||||||
throw new RuntimeException("Cannot search in unsorted TFile");
|
throw new RuntimeException("Cannot search in unsorted TFile");
|
||||||
@ -2104,6 +2213,7 @@ public int upperBound(RawComparable key) {
|
|||||||
*/
|
*/
|
||||||
public TFileIndex(BytesComparator comparator) {
|
public TFileIndex(BytesComparator comparator) {
|
||||||
index = new ArrayList<TFileIndexEntry>();
|
index = new ArrayList<TFileIndexEntry>();
|
||||||
|
recordNumIndex = new ArrayList<Long>();
|
||||||
this.comparator = comparator;
|
this.comparator = comparator;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2111,6 +2221,18 @@ public RawComparable getFirstKey() {
|
|||||||
return firstKey;
|
return firstKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Reader.Location getLocationByRecordNum(long recNum) {
|
||||||
|
int idx = Utils.upperBound(recordNumIndex, recNum);
|
||||||
|
long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1);
|
||||||
|
return new Reader.Location(idx, recNum-lastRecNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getRecordNumByLocation(Reader.Location location) {
|
||||||
|
int blkIndex = location.getBlockIndex();
|
||||||
|
long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1);
|
||||||
|
return lastRecNum + location.getRecordIndex();
|
||||||
|
}
|
||||||
|
|
||||||
public void setFirstKey(byte[] key, int offset, int length) {
|
public void setFirstKey(byte[] key, int offset, int length) {
|
||||||
firstKey = new ByteArray(new byte[length]);
|
firstKey = new ByteArray(new byte[length]);
|
||||||
System.arraycopy(key, offset, firstKey.buffer(), 0, length);
|
System.arraycopy(key, offset, firstKey.buffer(), 0, length);
|
||||||
@ -2125,6 +2247,8 @@ public RawComparable getLastKey() {
|
|||||||
|
|
||||||
public void addEntry(TFileIndexEntry keyEntry) {
|
public void addEntry(TFileIndexEntry keyEntry) {
|
||||||
index.add(keyEntry);
|
index.add(keyEntry);
|
||||||
|
sum += keyEntry.entries();
|
||||||
|
recordNumIndex.add(sum);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TFileIndexEntry getEntry(int bid) {
|
public TFileIndexEntry getEntry(int bid) {
|
||||||
|
@ -319,7 +319,7 @@ void basicWithSomeCodec(String codec) throws IOException {
|
|||||||
|
|
||||||
scanner.close();
|
scanner.close();
|
||||||
// test for a range of scanner
|
// test for a range of scanner
|
||||||
scanner = reader.createScanner(getSomeKey(10), getSomeKey(60));
|
scanner = reader.createScannerByKey(getSomeKey(10), getSomeKey(60));
|
||||||
readAndCheckbytes(scanner, 10, 50);
|
readAndCheckbytes(scanner, 10, 50);
|
||||||
assertFalse(scanner.advance());
|
assertFalse(scanner.advance());
|
||||||
scanner.close();
|
scanner.close();
|
||||||
|
@ -673,7 +673,7 @@ private void readValueBeforeKey(int count, int recordIndex)
|
|||||||
Reader reader =
|
Reader reader =
|
||||||
new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
||||||
Scanner scanner =
|
Scanner scanner =
|
||||||
reader.createScanner(composeSortedKey(KEY, count, recordIndex)
|
reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
|
||||||
.getBytes(), null);
|
.getBytes(), null);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -698,7 +698,7 @@ private void readKeyWithoutValue(int count, int recordIndex)
|
|||||||
throws IOException {
|
throws IOException {
|
||||||
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
||||||
Scanner scanner =
|
Scanner scanner =
|
||||||
reader.createScanner(composeSortedKey(KEY, count, recordIndex)
|
reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
|
||||||
.getBytes(), null);
|
.getBytes(), null);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -729,7 +729,7 @@ private void readValueWithoutKey(int count, int recordIndex)
|
|||||||
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
||||||
|
|
||||||
Scanner scanner =
|
Scanner scanner =
|
||||||
reader.createScanner(composeSortedKey(KEY, count, recordIndex)
|
reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
|
||||||
.getBytes(), null);
|
.getBytes(), null);
|
||||||
|
|
||||||
byte[] vbuf1 = new byte[BUF_SIZE];
|
byte[] vbuf1 = new byte[BUF_SIZE];
|
||||||
@ -753,7 +753,7 @@ private void readKeyManyTimes(int count, int recordIndex) throws IOException {
|
|||||||
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
Reader reader = new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
||||||
|
|
||||||
Scanner scanner =
|
Scanner scanner =
|
||||||
reader.createScanner(composeSortedKey(KEY, count, recordIndex)
|
reader.createScannerByKey(composeSortedKey(KEY, count, recordIndex)
|
||||||
.getBytes(), null);
|
.getBytes(), null);
|
||||||
|
|
||||||
// read the indexed key
|
// read the indexed key
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package org.apache.hadoop.io.file.tfile;
|
package org.apache.hadoop.io.file.tfile;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
@ -42,6 +43,7 @@ public class TestTFileSplit extends TestCase {
|
|||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private Path path;
|
private Path path;
|
||||||
|
private Random random = new Random();
|
||||||
|
|
||||||
private String comparator = "memcmp";
|
private String comparator = "memcmp";
|
||||||
private String outputFile = "TestTFileSplit";
|
private String outputFile = "TestTFileSplit";
|
||||||
@ -74,7 +76,7 @@ void readFile() throws IOException {
|
|||||||
long rowCount = 0;
|
long rowCount = 0;
|
||||||
BytesWritable key, value;
|
BytesWritable key, value;
|
||||||
for (int i = 0; i < numSplit; ++i, offset += splitSize) {
|
for (int i = 0; i < numSplit; ++i, offset += splitSize) {
|
||||||
Scanner scanner = reader.createScanner(offset, splitSize);
|
Scanner scanner = reader.createScannerByByteRange(offset, splitSize);
|
||||||
int count = 0;
|
int count = 0;
|
||||||
key = new BytesWritable();
|
key = new BytesWritable();
|
||||||
value = new BytesWritable();
|
value = new BytesWritable();
|
||||||
@ -91,17 +93,100 @@ void readFile() throws IOException {
|
|||||||
reader.close();
|
reader.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Similar to readFile(), tests the scanner created
|
||||||
|
* by record numbers rather than the offsets.
|
||||||
|
*/
|
||||||
|
void readRowSplits(int numSplits) throws IOException {
|
||||||
|
|
||||||
|
Reader reader =
|
||||||
|
new Reader(fs.open(path), fs.getFileStatus(path).getLen(), conf);
|
||||||
|
|
||||||
|
long totalRecords = reader.getEntryCount();
|
||||||
|
for (int i=0; i<numSplits; i++) {
|
||||||
|
long startRec = i*totalRecords/numSplits;
|
||||||
|
long endRec = (i+1)*totalRecords/numSplits;
|
||||||
|
if (i == numSplits-1) {
|
||||||
|
endRec = totalRecords;
|
||||||
|
}
|
||||||
|
Scanner scanner = reader.createScannerByRecordNum(startRec, endRec);
|
||||||
|
int count = 0;
|
||||||
|
BytesWritable key = new BytesWritable();
|
||||||
|
BytesWritable value = new BytesWritable();
|
||||||
|
long x=startRec;
|
||||||
|
while (!scanner.atEnd()) {
|
||||||
|
assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
|
||||||
|
scanner.entry().get(key, value);
|
||||||
|
++count;
|
||||||
|
assertEquals("Incorrect RecNum returned by scanner", scanner.getRecordNum(), x);
|
||||||
|
scanner.advance();
|
||||||
|
++x;
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
Assert.assertTrue(count == (endRec - startRec));
|
||||||
|
}
|
||||||
|
// make sure specifying range at the end gives zero records.
|
||||||
|
Scanner scanner = reader.createScannerByRecordNum(totalRecords, -1);
|
||||||
|
Assert.assertTrue(scanner.atEnd());
|
||||||
|
}
|
||||||
|
|
||||||
static String composeSortedKey(String prefix, int total, int value) {
|
static String composeSortedKey(String prefix, int total, int value) {
|
||||||
return String.format("%s%010d", prefix, value);
|
return String.format("%s%010d", prefix, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void checkRecNums() throws IOException {
|
||||||
|
long fileLen = fs.getFileStatus(path).getLen();
|
||||||
|
Reader reader = new Reader(fs.open(path), fileLen, conf);
|
||||||
|
long totalRecs = reader.getEntryCount();
|
||||||
|
long begin = random.nextLong() % (totalRecs / 2);
|
||||||
|
if (begin < 0)
|
||||||
|
begin += (totalRecs / 2);
|
||||||
|
long end = random.nextLong() % (totalRecs / 2);
|
||||||
|
if (end < 0)
|
||||||
|
end += (totalRecs / 2);
|
||||||
|
end += (totalRecs / 2) + 1;
|
||||||
|
|
||||||
|
assertEquals("RecNum for offset=0 should be 0", 0, reader
|
||||||
|
.getRecordNumNear(0));
|
||||||
|
for (long x : new long[] { fileLen, fileLen + 1, 2 * fileLen }) {
|
||||||
|
assertEquals("RecNum for offset>=fileLen should be total entries",
|
||||||
|
totalRecs, reader.getRecordNumNear(x));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (long i = 0; i < 100; ++i) {
|
||||||
|
assertEquals("Locaton to RecNum conversion not symmetric", i, reader
|
||||||
|
.getRecordNumByLocation(reader.getLocationByRecordNum(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (long i = 1; i < 100; ++i) {
|
||||||
|
long x = totalRecs - i;
|
||||||
|
assertEquals("Locaton to RecNum conversion not symmetric", x, reader
|
||||||
|
.getRecordNumByLocation(reader.getLocationByRecordNum(x)));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (long i = begin; i < end; ++i) {
|
||||||
|
assertEquals("Locaton to RecNum conversion not symmetric", i, reader
|
||||||
|
.getRecordNumByLocation(reader.getLocationByRecordNum(i)));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
long x = random.nextLong() % totalRecs;
|
||||||
|
if (x < 0) x += totalRecs;
|
||||||
|
assertEquals("Locaton to RecNum conversion not symmetric", x, reader
|
||||||
|
.getRecordNumByLocation(reader.getLocationByRecordNum(x)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testSplit() throws IOException {
|
public void testSplit() throws IOException {
|
||||||
System.out.println("testSplit");
|
System.out.println("testSplit");
|
||||||
createFile(100000, Compression.Algorithm.NONE.getName());
|
createFile(100000, Compression.Algorithm.NONE.getName());
|
||||||
|
checkRecNums();
|
||||||
readFile();
|
readFile();
|
||||||
|
readRowSplits(10);
|
||||||
fs.delete(path, true);
|
fs.delete(path, true);
|
||||||
createFile(500000, Compression.Algorithm.GZ.getName());
|
createFile(500000, Compression.Algorithm.GZ.getName());
|
||||||
|
checkRecNums();
|
||||||
readFile();
|
readFile();
|
||||||
|
readRowSplits(83);
|
||||||
fs.delete(path, true);
|
fs.delete(path, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -89,7 +89,7 @@ public void testFailureScannerWithKeys() throws IOException {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
Scanner scanner =
|
Scanner scanner =
|
||||||
reader.createScanner("aaa".getBytes(), "zzz".getBytes());
|
reader.createScannerByKey("aaa".getBytes(), "zzz".getBytes());
|
||||||
Assert
|
Assert
|
||||||
.fail("Failed to catch creating scanner with keys on unsorted file.");
|
.fail("Failed to catch creating scanner with keys on unsorted file.");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user