HADOOP-18392. Propagate vectored s3a input stream stats to file system stats. (#4704)

part of HADOOP-18103.

Contributed By: Mukund Thakur
This commit is contained in:
Mukund Thakur 2022-08-12 01:42:00 +05:30
parent 09c8084191
commit 93c4704b33
3 changed files with 161 additions and 123 deletions

View File

@ -24,11 +24,10 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction; import java.util.function.IntFunction;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
@ -43,13 +42,14 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport; import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase { public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@ -281,16 +281,11 @@ public void testEOFRanges() throws Exception {
in.readVectored(fileRanges, allocate); in.readVectored(fileRanges, allocate);
for (FileRange res : fileRanges) { for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData(); CompletableFuture<ByteBuffer> data = res.getData();
try { interceptFuture(EOFException.class,
ByteBuffer buffer = data.get(); "",
// Shouldn't reach here. ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
Assert.fail("EOFException must be thrown while reading EOF"); TimeUnit.SECONDS,
} catch (ExecutionException ex) { data);
// ignore as expected.
} catch (Exception ex) {
LOG.error("Exception while running vectored read ", ex);
Assert.fail("Exception while running vectored read " + ex);
}
} }
} }
} }
@ -410,7 +405,7 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
fs.openFile(path(VECTORED_READ_FILE_NAME)) fs.openFile(path(VECTORED_READ_FILE_NAME))
.build(); .build();
try (FSDataInputStream in = builder.get()) { try (FSDataInputStream in = builder.get()) {
LambdaTestUtils.intercept(clazz, intercept(clazz,
() -> in.readVectored(fileRanges, allocate)); () -> in.readVectored(fileRanges, allocate));
} }
} }

View File

@ -308,6 +308,23 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_OPERATIONS, StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream", "Count of read() operations in an input stream",
TYPE_COUNTER), TYPE_COUNTER),
STREAM_READ_VECTORED_OPERATIONS(
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
"Count of readVectored() operations in an input stream.",
TYPE_COUNTER),
STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
"Count of bytes discarded during readVectored() operation." +
" in an input stream",
TYPE_COUNTER),
STREAM_READ_VECTORED_INCOMING_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
"Count of incoming file ranges during readVectored() operation.",
TYPE_COUNTER),
STREAM_READ_VECTORED_COMBINED_RANGES(
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
"Count of combined file ranges during readVectored() operation.",
TYPE_COUNTER),
STREAM_READ_REMOTE_STREAM_ABORTED( STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED, StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Duration of aborting a remote stream during stream IO", "Duration of aborting a remote stream during stream IO",

View File

@ -176,146 +176,172 @@ public void testSameRanges() throws Exception {
* */ * */
@Test @Test
public void testNormalReadVsVectoredReadStatsCollection() throws Exception { public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
FileSystem fs = getTestFileSystemWithReadAheadDisabled();
List<FileRange> fileRanges = new ArrayList<>();
fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
CompletableFuture<FSDataInputStream> builder = List<FileRange> fileRanges = new ArrayList<>();
fs.openFile(path(VECTORED_READ_FILE_NAME)) fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
.withFileStatus(fileStatus) fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
.build(); fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
try (FSDataInputStream in = builder.get()) { fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
in.readVectored(fileRanges, getAllocate()); fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, getPool());
// audit the io statistics for this stream FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
IOStatistics st = in.getIOStatistics(); CompletableFuture<FSDataInputStream> builder =
LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st)); fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder.get()) {
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET);
returnBuffersToPoolPostRead(fileRanges, getPool());
// the vectored io operation must be tracked // audit the io statistics for this stream
verifyStatisticCounterValue(st, IOStatistics st = in.getIOStatistics();
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
1);
// the vectored io operation is being called with 5 input ranges. // the vectored io operation must be tracked
verifyStatisticCounterValue(st, verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
5); 1);
// 5 input ranges got combined in 3 as some of them are close. // the vectored io operation is being called with 5 input ranges.
verifyStatisticCounterValue(st, verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
3); 5);
// number of bytes discarded will be based on the above input ranges. // 5 input ranges got combined in 3 as some of them are close.
verifyStatisticCounterValue(st, verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
5944); 3);
verifyStatisticCounterValue(st, // number of bytes discarded will be based on the above input ranges.
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, verifyStatisticCounterValue(st,
3); StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
5944);
// read bytes should match the sum of requested length for each input ranges. verifyStatisticCounterValue(st,
verifyStatisticCounterValue(st, StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
StreamStatisticNames.STREAM_READ_BYTES, 3);
1424);
} // read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
CompletableFuture<FSDataInputStream> builder1 =
fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus)
.build();
try (FSDataInputStream in = builder1.get()) {
for (FileRange range : fileRanges) {
byte[] temp = new byte[range.getLength()];
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
} }
// audit the statistics for this stream CompletableFuture<FSDataInputStream> builder1 =
IOStatistics st = in.getIOStatistics(); fs.openFile(path(VECTORED_READ_FILE_NAME))
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st)); .withFileStatus(fileStatus)
.build();
verifyStatisticCounterValue(st, try (FSDataInputStream in = builder1.get()) {
for (FileRange range : fileRanges) {
byte[] temp = new byte[range.getLength()];
in.readFully((int) range.getOffset(), temp, 0, range.getLength());
}
// audit the statistics for this stream
IOStatistics st = in.getIOStatistics();
LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
0);
// all other counter values consistent.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
5);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
1424);
}
// validate stats are getting merged at fs instance level.
IOStatistics fsStats = fs.getIOStatistics();
// only 1 vectored io call is made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
0); 1);
// 8 get requests were made in this fs instance.
// all other counter values consistent. verifyStatisticCounterValue(fsStats,
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
5); 8);
// read bytes should match the sum of requested length for each input ranges. verifyStatisticCounterValue(fsStats,
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES, StreamStatisticNames.STREAM_READ_BYTES,
1424); 2848);
} }
} }
@Test @Test
public void testMultiVectoredReadStatsCollection() throws Exception { public void testMultiVectoredReadStatsCollection() throws Exception {
FileSystem fs = getTestFileSystemWithReadAheadDisabled(); try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
List<FileRange> ranges1 = getConsecutiveRanges(); List<FileRange> ranges1 = getConsecutiveRanges();
List<FileRange> ranges2 = getConsecutiveRanges(); List<FileRange> ranges2 = getConsecutiveRanges();
FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
CompletableFuture<FSDataInputStream> builder = CompletableFuture<FSDataInputStream> builder =
fs.openFile(path(VECTORED_READ_FILE_NAME)) fs.openFile(path(VECTORED_READ_FILE_NAME))
.withFileStatus(fileStatus) .withFileStatus(fileStatus)
.build(); .build();
try (FSDataInputStream in = builder.get()) { try (FSDataInputStream in = builder.get()) {
in.readVectored(ranges1, getAllocate()); in.readVectored(ranges1, getAllocate());
in.readVectored(ranges2, getAllocate()); in.readVectored(ranges2, getAllocate());
validateVectoredReadResult(ranges1, DATASET); validateVectoredReadResult(ranges1, DATASET);
validateVectoredReadResult(ranges2, DATASET); validateVectoredReadResult(ranges2, DATASET);
returnBuffersToPoolPostRead(ranges1, getPool()); returnBuffersToPoolPostRead(ranges1, getPool());
returnBuffersToPoolPostRead(ranges2, getPool()); returnBuffersToPoolPostRead(ranges2, getPool());
// audit the io statistics for this stream // audit the io statistics for this stream
IOStatistics st = in.getIOStatistics(); IOStatistics st = in.getIOStatistics();
// 2 vectored io calls are made above. // 2 vectored io calls are made above.
verifyStatisticCounterValue(st, verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);
// 2 vectored io operation is being called with 2 input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
4);
// 2 ranges are getting merged in 1 during both vectored io operation.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
2);
// number of bytes discarded will be 0 as the ranges are consecutive.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
// only 2 http get request will be made because ranges in both range list will be merged
// to 1 because they are consecutive.
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
2000);
}
IOStatistics fsStats = fs.getIOStatistics();
// 2 vectored io calls are made in this fs instance.
verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2); 2);
// 2 get requests were made in this fs instance.
// 2 vectored io operation is being called with 2 input ranges. verifyStatisticCounterValue(fsStats,
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
4);
// 2 ranges are getting merged in 1 during both vectored io operation.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
2);
// number of bytes discarded will be 0 as the ranges are consecutive.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
0);
// only 2 http get request will be made because ranges in both range list will be merged
// to 1 because they are consecutive.
verifyStatisticCounterValue(st,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2); 2);
// read bytes should match the sum of requested length for each input ranges.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_BYTES,
2000);
} }
} }
private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException { private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
Configuration conf = getFileSystem().getConf(); Configuration conf = getFileSystem().getConf();
// also resetting the min seek and max size values is important // also resetting the min seek and max size values is important
// as this same test suite has test which overrides these params. // as this same test suite has test which overrides these params.