HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
Contributed by Steve Loughran
This commit is contained in:
parent
d33ee67151
commit
c392075761
@ -55,6 +55,15 @@ public interface FileRange {
|
||||
*/
|
||||
void setData(CompletableFuture<ByteBuffer> data);
|
||||
|
||||
/**
|
||||
* Get any reference passed in to the file range constructor.
|
||||
* This is not used by any implementation code; it is to help
|
||||
* bind this API to libraries retrieving multiple stripes of
|
||||
* data in parallel.
|
||||
* @return a reference or null.
|
||||
*/
|
||||
Object getReference();
|
||||
|
||||
/**
|
||||
* Factory method to create a FileRange object.
|
||||
* @param offset starting offset of the range.
|
||||
@ -62,6 +71,17 @@ public interface FileRange {
|
||||
* @return a new instance of FileRangeImpl.
|
||||
*/
|
||||
static FileRange createFileRange(long offset, int length) {
|
||||
return new FileRangeImpl(offset, length);
|
||||
return new FileRangeImpl(offset, length, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method to create a FileRange object.
|
||||
* @param offset starting offset of the range.
|
||||
* @param length length of the range.
|
||||
* @param reference nullable reference to store in the range.
|
||||
* @return a new instance of FileRangeImpl.
|
||||
*/
|
||||
static FileRange createFileRange(long offset, int length, Object reference) {
|
||||
return new FileRangeImpl(offset, length, reference);
|
||||
}
|
||||
}
|
||||
|
@ -29,10 +29,10 @@
|
||||
* together into a single read for efficiency.
|
||||
*/
|
||||
public class CombinedFileRange extends FileRangeImpl {
|
||||
private ArrayList<FileRange> underlying = new ArrayList<>();
|
||||
private List<FileRange> underlying = new ArrayList<>();
|
||||
|
||||
public CombinedFileRange(long offset, long end, FileRange original) {
|
||||
super(offset, (int) (end - offset));
|
||||
super(offset, (int) (end - offset), null);
|
||||
this.underlying.add(original);
|
||||
}
|
||||
|
||||
|
@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange {
|
||||
private int length;
|
||||
private CompletableFuture<ByteBuffer> reader;
|
||||
|
||||
public FileRangeImpl(long offset, int length) {
|
||||
/**
|
||||
* nullable reference to store in the range.
|
||||
*/
|
||||
private final Object reference;
|
||||
|
||||
/**
|
||||
* Create.
|
||||
* @param offset offset in file
|
||||
* @param length length of data to read.
|
||||
* @param reference nullable reference to store in the range.
|
||||
*/
|
||||
public FileRangeImpl(long offset, int length, Object reference) {
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.reference = reference;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -71,4 +83,9 @@ public void setData(CompletableFuture<ByteBuffer> pReader) {
|
||||
public CompletableFuture<ByteBuffer> getData() {
|
||||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getReference() {
|
||||
return reference;
|
||||
}
|
||||
}
|
||||
|
@ -96,7 +96,10 @@ public void testRounding() {
|
||||
|
||||
@Test
|
||||
public void testMerge() {
|
||||
FileRange base = FileRange.createFileRange(2000, 1000);
|
||||
// a reference to use for tracking
|
||||
Object tracker1 = "one";
|
||||
Object tracker2 = "two";
|
||||
FileRange base = FileRange.createFileRange(2000, 1000, tracker1);
|
||||
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
|
||||
|
||||
// test when the gap between is too big
|
||||
@ -104,44 +107,48 @@ public void testMerge() {
|
||||
FileRange.createFileRange(5000, 1000), 2000, 4000));
|
||||
assertEquals("Number of ranges in merged range shouldn't increase",
|
||||
1, mergeBase.getUnderlying().size());
|
||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
||||
assertEquals("post merge length", 1000, mergeBase.getLength());
|
||||
assertFileRange(mergeBase, 2000, 1000);
|
||||
|
||||
// test when the total size gets exceeded
|
||||
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
|
||||
FileRange.createFileRange(5000, 1000), 2001, 3999));
|
||||
assertEquals("Number of ranges in merged range shouldn't increase",
|
||||
1, mergeBase.getUnderlying().size());
|
||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
||||
assertEquals("post merge length", 1000, mergeBase.getLength());
|
||||
assertFileRange(mergeBase, 2000, 1000);
|
||||
|
||||
// test when the merge works
|
||||
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
|
||||
FileRange.createFileRange(5000, 1000), 2001, 4000));
|
||||
FileRange.createFileRange(5000, 1000, tracker2),
|
||||
2001, 4000));
|
||||
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
||||
assertEquals("post merge offset", 2000, mergeBase.getOffset());
|
||||
assertEquals("post merge length", 4000, mergeBase.getLength());
|
||||
assertFileRange(mergeBase, 2000, 4000);
|
||||
|
||||
Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference())
|
||||
.describedAs("reference of range %s", mergeBase.getUnderlying().get(0))
|
||||
.isSameAs(tracker1);
|
||||
Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference())
|
||||
.describedAs("reference of range %s", mergeBase.getUnderlying().get(1))
|
||||
.isSameAs(tracker2);
|
||||
|
||||
// reset the mergeBase and test with a 10:1 reduction
|
||||
mergeBase = new CombinedFileRange(200, 300, base);
|
||||
assertEquals(200, mergeBase.getOffset());
|
||||
assertEquals(100, mergeBase.getLength());
|
||||
assertFileRange(mergeBase, 200, 100);
|
||||
|
||||
assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
|
||||
FileRange.createFileRange(5000, 1000), 201, 400));
|
||||
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
|
||||
assertEquals("post merge offset", 200, mergeBase.getOffset());
|
||||
assertEquals("post merge length", 400, mergeBase.getLength());
|
||||
assertFileRange(mergeBase, 200, 400);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortAndMerge() {
|
||||
List<FileRange> input = Arrays.asList(
|
||||
FileRange.createFileRange(3000, 100),
|
||||
FileRange.createFileRange(2100, 100),
|
||||
FileRange.createFileRange(1000, 100)
|
||||
FileRange.createFileRange(3000, 100, "1"),
|
||||
FileRange.createFileRange(2100, 100, null),
|
||||
FileRange.createFileRange(1000, 100, "3")
|
||||
);
|
||||
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
||||
List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
|
||||
final List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
|
||||
Arrays.asList(sortRanges(input)), 100, 1001, 2500);
|
||||
Assertions.assertThat(outputList)
|
||||
.describedAs("merged range size")
|
||||
@ -150,51 +157,105 @@ public void testSortAndMerge() {
|
||||
Assertions.assertThat(output.getUnderlying())
|
||||
.describedAs("merged range underlying size")
|
||||
.hasSize(3);
|
||||
assertEquals("range[1000,3100)", output.toString());
|
||||
// range[1000,3100)
|
||||
assertFileRange(output, 1000, 2100);
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
|
||||
|
||||
// the minSeek doesn't allow the first two to merge
|
||||
assertFalse("Ranges are non disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
|
||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
||||
final List<CombinedFileRange> list2 = VectoredReadUtils.mergeSortedRanges(
|
||||
Arrays.asList(sortRanges(input)),
|
||||
100, 1000, 2100);
|
||||
Assertions.assertThat(outputList)
|
||||
Assertions.assertThat(list2)
|
||||
.describedAs("merged range size")
|
||||
.hasSize(2);
|
||||
assertEquals("range[1000,1100)", outputList.get(0).toString());
|
||||
assertEquals("range[2100,3100)", outputList.get(1).toString());
|
||||
assertFileRange(list2.get(0), 1000, 100);
|
||||
|
||||
// range[2100,3100)
|
||||
assertFileRange(list2.get(1), 2100, 1000);
|
||||
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
|
||||
VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000));
|
||||
|
||||
// the maxSize doesn't allow the third range to merge
|
||||
assertFalse("Ranges are non disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
|
||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
||||
final List<CombinedFileRange> list3 = VectoredReadUtils.mergeSortedRanges(
|
||||
Arrays.asList(sortRanges(input)),
|
||||
100, 1001, 2099);
|
||||
Assertions.assertThat(outputList)
|
||||
Assertions.assertThat(list3)
|
||||
.describedAs("merged range size")
|
||||
.hasSize(2);
|
||||
assertEquals("range[1000,2200)", outputList.get(0).toString());
|
||||
assertEquals("range[3000,3100)", outputList.get(1).toString());
|
||||
// range[1000,2200)
|
||||
CombinedFileRange range0 = list3.get(0);
|
||||
assertFileRange(range0, 1000, 1200);
|
||||
assertFileRange(range0.getUnderlying().get(0),
|
||||
1000, 100, "3");
|
||||
assertFileRange(range0.getUnderlying().get(1),
|
||||
2100, 100, null);
|
||||
CombinedFileRange range1 = list3.get(1);
|
||||
// range[3000,3100)
|
||||
assertFileRange(range1, 3000, 100);
|
||||
assertFileRange(range1.getUnderlying().get(0),
|
||||
3000, 100, "1");
|
||||
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
|
||||
VectoredReadUtils.isOrderedDisjoint(list3, 100, 800));
|
||||
|
||||
// test the round up and round down (the maxSize doesn't allow any merges)
|
||||
assertFalse("Ranges are non disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
|
||||
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
|
||||
final List<CombinedFileRange> list4 = VectoredReadUtils.mergeSortedRanges(
|
||||
Arrays.asList(sortRanges(input)),
|
||||
16, 1001, 100);
|
||||
Assertions.assertThat(outputList)
|
||||
Assertions.assertThat(list4)
|
||||
.describedAs("merged range size")
|
||||
.hasSize(3);
|
||||
assertEquals("range[992,1104)", outputList.get(0).toString());
|
||||
assertEquals("range[2096,2208)", outputList.get(1).toString());
|
||||
assertEquals("range[2992,3104)", outputList.get(2).toString());
|
||||
// range[992,1104)
|
||||
assertFileRange(list4.get(0), 992, 112);
|
||||
// range[2096,2208)
|
||||
assertFileRange(list4.get(1), 2096, 112);
|
||||
// range[2992,3104)
|
||||
assertFileRange(list4.get(2), 2992, 112);
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
|
||||
VectoredReadUtils.isOrderedDisjoint(list4, 16, 700));
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a file range satisfies the conditions.
|
||||
* @param range range to validate
|
||||
* @param offset offset of range
|
||||
* @param length range length
|
||||
*/
|
||||
private void assertFileRange(FileRange range, long offset, int length) {
|
||||
Assertions.assertThat(range)
|
||||
.describedAs("file range %s", range)
|
||||
.isNotNull();
|
||||
Assertions.assertThat(range.getOffset())
|
||||
.describedAs("offset of %s", range)
|
||||
.isEqualTo(offset);
|
||||
Assertions.assertThat(range.getLength())
|
||||
.describedAs("length of %s", range)
|
||||
.isEqualTo(length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a file range satisfies the conditions.
|
||||
* @param range range to validate
|
||||
* @param offset offset of range
|
||||
* @param length range length
|
||||
* @param reference reference; may be null.
|
||||
*/
|
||||
private void assertFileRange(FileRange range, long offset, int length, Object reference) {
|
||||
assertFileRange(range, offset, length);
|
||||
Assertions.assertThat(range.getReference())
|
||||
.describedAs("reference field of file range %s", range)
|
||||
.isEqualTo(reference);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSortAndMergeMoreCases() throws Exception {
|
||||
List<FileRange> input = Arrays.asList(
|
||||
@ -214,7 +275,9 @@ public void testSortAndMergeMoreCases() throws Exception {
|
||||
Assertions.assertThat(output.getUnderlying())
|
||||
.describedAs("merged range underlying size")
|
||||
.hasSize(4);
|
||||
assertEquals("range[1000,3110)", output.toString());
|
||||
|
||||
assertFileRange(output, 1000, 2110);
|
||||
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||
|
||||
@ -227,7 +290,8 @@ public void testSortAndMergeMoreCases() throws Exception {
|
||||
Assertions.assertThat(output.getUnderlying())
|
||||
.describedAs("merged range underlying size")
|
||||
.hasSize(4);
|
||||
assertEquals("range[1000,3200)", output.toString());
|
||||
assertFileRange(output, 1000, 2200);
|
||||
|
||||
assertTrue("merged output ranges are disjoint",
|
||||
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
|
||||
|
||||
|
@ -169,7 +169,7 @@ static class FileRangeCallback extends FileRangeImpl implements
|
||||
|
||||
FileRangeCallback(AsynchronousFileChannel channel, long offset,
|
||||
int length, Joiner joiner, ByteBuffer buffer) {
|
||||
super(offset, length);
|
||||
super(offset, length, null);
|
||||
this.channel = channel;
|
||||
this.joiner = joiner;
|
||||
this.buffer = buffer;
|
||||
|
Loading…
Reference in New Issue
Block a user