HADOOP-18392. Propagate vectored s3a input stream stats to file system stats. (#4704)
part of HADOOP-18103. Contributed By: Mukund Thakur
This commit is contained in:
parent
e9509ac467
commit
b28e4c6904
@ -24,11 +24,10 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
@ -43,13 +42,14 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.impl.FutureIOSupport;
|
||||
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
|
||||
@ -281,16 +281,11 @@ public void testEOFRanges() throws Exception {
|
||||
in.readVectored(fileRanges, allocate);
|
||||
for (FileRange res : fileRanges) {
|
||||
CompletableFuture<ByteBuffer> data = res.getData();
|
||||
try {
|
||||
ByteBuffer buffer = data.get();
|
||||
// Shouldn't reach here.
|
||||
Assert.fail("EOFException must be thrown while reading EOF");
|
||||
} catch (ExecutionException ex) {
|
||||
// ignore as expected.
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Exception while running vectored read ", ex);
|
||||
Assert.fail("Exception while running vectored read " + ex);
|
||||
}
|
||||
interceptFuture(EOFException.class,
|
||||
"",
|
||||
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
|
||||
TimeUnit.SECONDS,
|
||||
data);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -410,7 +405,7 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.build();
|
||||
try (FSDataInputStream in = builder.get()) {
|
||||
LambdaTestUtils.intercept(clazz,
|
||||
intercept(clazz,
|
||||
() -> in.readVectored(fileRanges, allocate));
|
||||
}
|
||||
}
|
||||
|
@ -308,6 +308,23 @@ public enum Statistic {
|
||||
StreamStatisticNames.STREAM_READ_OPERATIONS,
|
||||
"Count of read() operations in an input stream",
|
||||
TYPE_COUNTER),
|
||||
STREAM_READ_VECTORED_OPERATIONS(
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
"Count of readVectored() operations in an input stream.",
|
||||
TYPE_COUNTER),
|
||||
STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
"Count of bytes discarded during readVectored() operation." +
|
||||
" in an input stream",
|
||||
TYPE_COUNTER),
|
||||
STREAM_READ_VECTORED_INCOMING_RANGES(
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||
"Count of incoming file ranges during readVectored() operation.",
|
||||
TYPE_COUNTER),
|
||||
STREAM_READ_VECTORED_COMBINED_RANGES(
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||
"Count of combined file ranges during readVectored() operation.",
|
||||
TYPE_COUNTER),
|
||||
STREAM_READ_REMOTE_STREAM_ABORTED(
|
||||
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
|
||||
"Duration of aborting a remote stream during stream IO",
|
||||
|
@ -176,146 +176,172 @@ public void testSameRanges() throws Exception {
|
||||
* */
|
||||
@Test
|
||||
public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
|
||||
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
||||
List<FileRange> fileRanges = new ArrayList<>();
|
||||
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
||||
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
||||
|
||||
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||
CompletableFuture<FSDataInputStream> builder =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
try (FSDataInputStream in = builder.get()) {
|
||||
in.readVectored(fileRanges, getAllocate());
|
||||
validateVectoredReadResult(fileRanges, DATASET);
|
||||
returnBuffersToPoolPostRead(fileRanges, getPool());
|
||||
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
|
||||
List<FileRange> fileRanges = new ArrayList<>();
|
||||
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
|
||||
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
|
||||
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
|
||||
|
||||
// audit the io statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
|
||||
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||
CompletableFuture<FSDataInputStream> builder =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
try (FSDataInputStream in = builder.get()) {
|
||||
in.readVectored(fileRanges, getAllocate());
|
||||
validateVectoredReadResult(fileRanges, DATASET);
|
||||
returnBuffersToPoolPostRead(fileRanges, getPool());
|
||||
|
||||
// the vectored io operation must be tracked
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
1);
|
||||
// audit the io statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
|
||||
|
||||
// the vectored io operation is being called with 5 input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||
5);
|
||||
// the vectored io operation must be tracked
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
1);
|
||||
|
||||
// 5 input ranges got combined in 3 as some of them are close.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||
3);
|
||||
// the vectored io operation is being called with 5 input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||
5);
|
||||
|
||||
// number of bytes discarded will be based on the above input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
5944);
|
||||
// 5 input ranges got combined in 3 as some of them are close.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||
3);
|
||||
|
||||
verifyStatisticCounterValue(st,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
3);
|
||||
// number of bytes discarded will be based on the above input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
5944);
|
||||
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
1424);
|
||||
verifyStatisticCounterValue(st,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
3);
|
||||
|
||||
}
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
1424);
|
||||
|
||||
CompletableFuture<FSDataInputStream> builder1 =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
|
||||
try (FSDataInputStream in = builder1.get()) {
|
||||
for (FileRange range : fileRanges) {
|
||||
byte[] temp = new byte[range.getLength()];
|
||||
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
|
||||
}
|
||||
|
||||
// audit the statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
|
||||
CompletableFuture<FSDataInputStream> builder1 =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
|
||||
verifyStatisticCounterValue(st,
|
||||
try (FSDataInputStream in = builder1.get()) {
|
||||
for (FileRange range : fileRanges) {
|
||||
byte[] temp = new byte[range.getLength()];
|
||||
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
|
||||
}
|
||||
|
||||
// audit the statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
|
||||
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
0);
|
||||
|
||||
// all other counter values consistent.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
0);
|
||||
verifyStatisticCounterValue(st,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
5);
|
||||
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
1424);
|
||||
}
|
||||
// validate stats are getting merged at fs instance level.
|
||||
IOStatistics fsStats = fs.getIOStatistics();
|
||||
// only 1 vectored io call is made in this fs instance.
|
||||
verifyStatisticCounterValue(fsStats,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
0);
|
||||
|
||||
// all other counter values consistent.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
0);
|
||||
verifyStatisticCounterValue(st,
|
||||
1);
|
||||
// 8 get requests were made in this fs instance.
|
||||
verifyStatisticCounterValue(fsStats,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
5);
|
||||
8);
|
||||
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
verifyStatisticCounterValue(fsStats,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
1424);
|
||||
2848);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiVectoredReadStatsCollection() throws Exception {
|
||||
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
|
||||
List<FileRange> ranges1 = getConsecutiveRanges();
|
||||
List<FileRange> ranges2 = getConsecutiveRanges();
|
||||
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||
CompletableFuture<FSDataInputStream> builder =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
try (FSDataInputStream in = builder.get()) {
|
||||
in.readVectored(ranges1, getAllocate());
|
||||
in.readVectored(ranges2, getAllocate());
|
||||
validateVectoredReadResult(ranges1, DATASET);
|
||||
validateVectoredReadResult(ranges2, DATASET);
|
||||
returnBuffersToPoolPostRead(ranges1, getPool());
|
||||
returnBuffersToPoolPostRead(ranges2, getPool());
|
||||
try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
|
||||
List<FileRange> ranges1 = getConsecutiveRanges();
|
||||
List<FileRange> ranges2 = getConsecutiveRanges();
|
||||
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
|
||||
CompletableFuture<FSDataInputStream> builder =
|
||||
fs.openFile(path(VECTORED_READ_FILE_NAME))
|
||||
.withFileStatus(fileStatus)
|
||||
.build();
|
||||
try (FSDataInputStream in = builder.get()) {
|
||||
in.readVectored(ranges1, getAllocate());
|
||||
in.readVectored(ranges2, getAllocate());
|
||||
validateVectoredReadResult(ranges1, DATASET);
|
||||
validateVectoredReadResult(ranges2, DATASET);
|
||||
returnBuffersToPoolPostRead(ranges1, getPool());
|
||||
returnBuffersToPoolPostRead(ranges2, getPool());
|
||||
|
||||
// audit the io statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
// audit the io statistics for this stream
|
||||
IOStatistics st = in.getIOStatistics();
|
||||
|
||||
// 2 vectored io calls are made above.
|
||||
verifyStatisticCounterValue(st,
|
||||
// 2 vectored io calls are made above.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
2);
|
||||
|
||||
// 2 vectored io operation is being called with 2 input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||
4);
|
||||
|
||||
// 2 ranges are getting merged in 1 during both vectored io operation.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||
2);
|
||||
|
||||
// number of bytes discarded will be 0 as the ranges are consecutive.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
0);
|
||||
// only 2 http get request will be made because ranges in both range list will be merged
|
||||
// to 1 because they are consecutive.
|
||||
verifyStatisticCounterValue(st,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
2);
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
2000);
|
||||
}
|
||||
IOStatistics fsStats = fs.getIOStatistics();
|
||||
// 2 vectored io calls are made in this fs instance.
|
||||
verifyStatisticCounterValue(fsStats,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
|
||||
2);
|
||||
|
||||
// 2 vectored io operation is being called with 2 input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
|
||||
4);
|
||||
|
||||
// 2 ranges are getting merged in 1 during both vectored io operation.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
|
||||
2);
|
||||
|
||||
// number of bytes discarded will be 0 as the ranges are consecutive.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
|
||||
0);
|
||||
// only 2 http get request will be made because ranges in both range list will be merged
|
||||
// to 1 because they are consecutive.
|
||||
verifyStatisticCounterValue(st,
|
||||
// 2 get requests were made in this fs instance.
|
||||
verifyStatisticCounterValue(fsStats,
|
||||
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
|
||||
2);
|
||||
// read bytes should match the sum of requested length for each input ranges.
|
||||
verifyStatisticCounterValue(st,
|
||||
StreamStatisticNames.STREAM_READ_BYTES,
|
||||
2000);
|
||||
}
|
||||
}
|
||||
|
||||
private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
|
||||
private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
|
||||
Configuration conf = getFileSystem().getConf();
|
||||
// also resetting the min seek and max size values is important
|
||||
// as this same test suite has test which overrides these params.
|
||||
|
Loading…
Reference in New Issue
Block a user